diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml
index 41e5e0f9..879f7cf9 100644
--- a/amazon-kinesis-client-multilang/pom.xml
+++ b/amazon-kinesis-client-multilang/pom.xml
@@ -21,7 +21,7 @@
amazon-kinesis-client-pom
software.amazon.kinesis
- 2.4.5
+ 2.4.6-SNAPSHOT
4.0.0
diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index 77250400..a80e7ae4 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -22,7 +22,7 @@
software.amazon.kinesis
amazon-kinesis-client-pom
- 2.4.5
+ 2.4.6-SNAPSHOT
amazon-kinesis-client
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
index 5d9f73e9..5c877b80 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
@@ -41,7 +41,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -54,7 +53,6 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
-import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
@@ -75,7 +73,6 @@ import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
-import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
@@ -83,7 +80,6 @@ import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
-import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
@@ -113,16 +109,16 @@ public class Scheduler implements Runnable {
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
- private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
+ private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
- private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
+ private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 60_000L;
private static final boolean SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS = false;
private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
- private SchedulerLog slog = new SchedulerLog();
+ private final SchedulerLog slog = new SchedulerLog();
private final CheckpointConfig checkpointConfig;
private final CoordinatorConfig coordinatorConfig;
@@ -175,7 +171,7 @@ public class Scheduler implements Runnable {
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
- private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap<>();
private volatile boolean shutdown;
private volatile long shutdownStartTimeMillis;
@@ -183,7 +179,7 @@ public class Scheduler implements Runnable {
private final Object lock = new Object();
- private Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
+ private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
private boolean leasesSyncedOnAppInit = false;
/**
@@ -236,8 +232,13 @@ public class Scheduler implements Runnable {
return multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
},
- streamConfig ->
- Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
+ streamConfig -> {
+ // use a concrete, non-singleton map to allow computeIfAbsent(...)
+ // without forcing behavioral differences for multi-stream support
+ final Map map = new HashMap<>();
+ map.put(streamConfig.streamIdentifier(), streamConfig);
+ return map;
+ });
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker.
@@ -348,7 +349,6 @@ public class Scheduler implements Runnable {
log.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize();
- TaskResult result;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
if (shouldInitiateLeaseSync()) {
log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier());
@@ -450,7 +450,6 @@ public class Scheduler implements Runnable {
return leaderDecider.isLeader(leaseManagementConfig.workerIdentifier());
}
-
/**
* Note: This method has package level access solely for testing purposes.
* Sync all streams method.
@@ -484,10 +483,11 @@ public class Scheduler implements Runnable {
// For new streams discovered, do a shard sync and update the currentStreamConfigMap
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
- log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
- ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier));
+ final StreamConfig streamConfig = newStreamConfigMap.get(streamIdentifier);
+ log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig);
+ ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(streamConfig);
shardSyncTaskManager.submitShardSyncTask();
- currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier));
+ currentStreamConfigMap.put(streamIdentifier, streamConfig);
streamsSynced.add(streamIdentifier);
} else {
if (log.isDebugEnabled()) {
@@ -521,12 +521,10 @@ public class Scheduler implements Runnable {
// In order to give workers with stale stream info, sufficient time to learn about the new streams
// before attempting to delete it, we will be deferring the leases deletion based on the
// defer time period.
-
- currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier));
-
+ currentStreamConfigMap.keySet().forEach(enqueueStreamLeaseDeletionOperation);
} else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(
- streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
+ streamIdentifiers -> streamIdentifiers.forEach(enqueueStreamLeaseDeletionOperation));
} else {
// Remove the old/stale streams identified through the new and existing streams list, without
// cleaning up their leases. Disabling deprecated shard sync + lease cleanup through a flag.
@@ -614,28 +612,27 @@ public class Scheduler implements Runnable {
private Set deleteMultiStreamLeases(Set streamIdentifiers)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
+ if (streamIdentifiers.isEmpty()) {
+ return Collections.emptySet();
+ }
+
final Set streamsSynced = new HashSet<>();
- List leases = null;
- Map> streamIdToShardsMap = null;
- for(StreamIdentifier streamIdentifier : streamIdentifiers) {
- if (leases == null) {
- // Lazy Load once and use many times for this iteration.
- leases = fetchMultiStreamLeases();
- }
- if (streamIdToShardsMap == null) {
- // Lazy load once and use many times for this iteration.
- streamIdToShardsMap = leases.stream().collect(Collectors
- .groupingBy(MultiStreamLease::streamIdentifier,
- Collectors.toCollection(ArrayList::new)));
- }
- log.warn("Found old/deleted stream: " + streamIdentifier + ". Directly deleting leases of this stream.");
+ final List leases = fetchMultiStreamLeases();
+ final Map> streamIdToShardsMap = leases.stream().collect(
+ Collectors.groupingBy(MultiStreamLease::streamIdentifier, Collectors.toCollection(ArrayList::new)));
+ for (StreamIdentifier streamIdentifier : streamIdentifiers) {
// Deleting leases will cause the workers to shutdown the record processors for these shards.
if (deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) {
+ log.warn("Found old/deleted stream: {}. Directly deleting leases of this stream.", streamIdentifier);
currentStreamConfigMap.remove(streamIdentifier);
staleStreamDeletionMap.remove(streamIdentifier);
streamsSynced.add(streamIdentifier);
}
}
+ if (!streamsSynced.isEmpty()) {
+ // map keys are StreamIdentifiers, which are members of StreamConfig, and therefore redundant
+ log.info("Streams retained post-deletion: {}", currentStreamConfigMap.values());
+ }
return streamsSynced;
}
@@ -655,9 +652,15 @@ public class Scheduler implements Runnable {
return true;
}
- // Generate default StreamConfig for an "orphaned" stream that is in the lease table but not tracked
+ /**
+ * Generates default StreamConfig for an "orphaned" stream that is in the lease table but not tracked.
+ *
+ * @param streamIdentifier stream for which an orphan config should be generated
+ */
private StreamConfig getOrphanedStreamConfig(StreamIdentifier streamIdentifier) {
- return new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream);
+ final StreamConfig orphanConfig = new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream);
+ log.info("Identified as orphan: {}", orphanConfig);
+ return orphanConfig;
}
/**
@@ -919,7 +922,7 @@ public class Scheduler implements Runnable {
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
// to gracefully complete the reading.
- final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getOrphanedStreamConfig(streamIdentifier));
+ final StreamConfig streamConfig = currentStreamConfigMap.computeIfAbsent(streamIdentifier, this::getOrphanedStreamConfig);
Validate.notNull(streamConfig, "StreamConfig should not be null");
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory);
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
index 53739e40..abb85612 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
@@ -26,6 +26,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.KinesisClientLibraryPackage;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.MultiStreamTracker;
@@ -46,7 +47,7 @@ public class RetrievalConfig {
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java";
- public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.5";
+ public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.6-SNAPSHOT";
/**
* Client used to make calls to Kinesis for records retrieval
diff --git a/pom.xml b/pom.xml
index 59feefef..ac946119 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
amazon-kinesis-client-pom
pom
Amazon Kinesis Client Library
- 2.4.5
+ 2.4.6-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.