diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index aecb1331..51c0b5e1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -84,6 +84,7 @@ class PeriodicShardSyncManager { private final LeaseRefresher leaseRefresher; private final Map currentStreamConfigMap; private final Function shardSyncTaskManagerProvider; + private final Map streamToShardSyncTaskManagerMap; private final ScheduledExecutorService shardSyncThreadPool; private final boolean isMultiStreamingMode; private final MetricsFactory metricsFactory; @@ -93,11 +94,13 @@ class PeriodicShardSyncManager { PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, Map currentStreamConfigMap, - Function shardSyncTaskManagerProvider, boolean isMultiStreamingMode, - MetricsFactory metricsFactory, + Function shardSyncTaskManagerProvider, + Map streamToShardSyncTaskManagerMap, + boolean isMultiStreamingMode, MetricsFactory metricsFactory, long leasesRecoveryAuditorExecutionFrequencyMillis, int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, + streamToShardSyncTaskManagerMap, Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode, metricsFactory, leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold); } @@ -105,6 +108,7 @@ class PeriodicShardSyncManager { PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, Map currentStreamConfigMap, Function shardSyncTaskManagerProvider, + Map streamToShardSyncTaskManagerMap, ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode, MetricsFactory metricsFactory, long leasesRecoveryAuditorExecutionFrequencyMillis, @@ -116,6 +120,7 @@ class PeriodicShardSyncManager { this.leaseRefresher = leaseRefresher; this.currentStreamConfigMap = currentStreamConfigMap; this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider; + this.streamToShardSyncTaskManagerMap = streamToShardSyncTaskManagerMap; this.shardSyncThreadPool = shardSyncThreadPool; this.isMultiStreamingMode = isMultiStreamingMode; this.metricsFactory = metricsFactory; @@ -175,6 +180,7 @@ class PeriodicShardSyncManager { PERIODIC_SHARD_SYNC_MANAGER); int numStreamsWithPartialLeases = 0; int numStreamsToSync = 0; + int numSkippedShardSyncTask = 0; boolean isRunSuccess = false; final long runStartMillis = System.currentTimeMillis(); @@ -205,12 +211,26 @@ class PeriodicShardSyncManager { log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier); continue; } - final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider - .apply(streamConfig); + final ShardSyncTaskManager shardSyncTaskManager; + if (streamToShardSyncTaskManagerMap.containsKey(streamConfig)) { + log.info("shardSyncTaskManager for stream {} already exists", + streamIdentifier.streamName()); + shardSyncTaskManager = streamToShardSyncTaskManagerMap.get(streamConfig); + } + else { + // If streamConfig of a stream has already been added to currentStreamConfigMap but + // Scheduler failed to create shardSyncTaskManager for it, then Scheduler will not try + // to create one later. So enable PeriodicShardSyncManager to do it for such cases + log.info("Failed to get shardSyncTaskManager so creating one for stream {}.", + streamIdentifier.streamName()); + shardSyncTaskManager = streamToShardSyncTaskManagerMap.computeIfAbsent( + streamConfig, s -> shardSyncTaskManagerProvider.apply(s)); + } if (!shardSyncTaskManager.submitShardSyncTask()) { log.warn( "Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.", shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + numSkippedShardSyncTask += 1; } else { log.info("Submitted shard sync task for stream {} because of reason {}", shardSyncTaskManager.shardDetector().streamIdentifier().streamName(), @@ -227,6 +247,7 @@ class PeriodicShardSyncManager { } finally { scope.addData("NumStreamsWithPartialLeases", numStreamsWithPartialLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("NumStreamsToSync", numStreamsToSync, StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope.addData("NumSkippedShardSyncTask", numSkippedShardSyncTask, StandardUnit.COUNT, MetricsLevel.SUMMARY); MetricsUtil.addSuccessAndLatency(scope, isRunSuccess, runStartMillis, MetricsLevel.SUMMARY); scope.end(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 11983ded..54c67997 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -141,7 +141,7 @@ public class Scheduler implements Runnable { private final DiagnosticEventHandler diagnosticEventHandler; private final LeaseCoordinator leaseCoordinator; private final Function shardSyncTaskManagerProvider; - private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); + private final Map streamToShardSyncTaskManagerMap = new ConcurrentHashMap<>(); private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; @@ -292,7 +292,7 @@ public class Scheduler implements Runnable { this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, - shardSyncTaskManagerProvider, isMultiStreamMode, metricsFactory, + shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, isMultiStreamMode, metricsFactory, leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(), leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold()); this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index 1caf0629..9a015c28 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -196,7 +196,7 @@ public class ShardSyncTaskManager { } else { if (log.isDebugEnabled()) { log.debug("Previous {} task still pending. Not submitting new task. " - + "Enqueued a request that will be executed when the current request completes.", currentTask.taskType()); + + "Triggered a pending request but will not be executed until the current request completes.", currentTask.taskType()); } shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/); } @@ -208,25 +208,6 @@ public class ShardSyncTaskManager { log.error("Caught exception running {} task: {}", currentTask.taskType(), exception != null ? exception : taskResult.getException()); } - // Acquire lock here. If shardSyncRequestPending is false in this completionStage and - // submitShardSyncTask is invoked, before completion stage exits (future completes) - // but right after the value of shardSyncRequestPending is checked, it will result in - // shardSyncRequestPending being set to true, but no pending futures to trigger the next - // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the - // previous task is in this completion stage, checkAndSubmitNextTask is not invoked - // until this completionStage exits. - try { - lock.lock(); - if (shardSyncRequestPending.get()) { - shardSyncRequestPending.set(false); - // reset future to null, so next call creates a new one - // without trying to get results from the old future. - future = null; - checkAndSubmitNextTask(); - } - } finally { - lock.unlock(); - } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index 71375c3d..bda7f53b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -70,12 +70,14 @@ public class PeriodicShardSyncManagerTest { Map currentStreamConfigMap; @Mock Function shardSyncTaskManagerProvider; + @Mock + Map streamToShardSyncTaskManagerMap; @Before public void setup() { streamIdentifier = StreamIdentifier.multiStreamInstance("123456789012:stream:456"); periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap, - shardSyncTaskManagerProvider, true, new NullMetricsFactory(), 2 * 60 * 1000, 3); + shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, true, new NullMetricsFactory(), 2 * 60 * 1000, 3); } @Test