diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index ba5aabd1..da238e4b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -85,7 +85,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; this.initialPositionInStream = initialPositionInStream; this.shardSyncer = shardSyncer; this.shardSyncRequestPending = new AtomicBoolean(false); - this.lock = new ReentrantLock(Boolean.TRUE); + this.lock = new ReentrantLock(); } Future syncShardAndLeaseInfo(List latestShards) { @@ -117,30 +117,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, shardSyncer, latestShards), metricsFactory); future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) - .whenComplete((taskResult, exception) -> { - if (exception != null) { - LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception); - } - // Acquire lock here. If shardSyncRequestPending is false in this completionStage and - // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) - // but right after the value of shardSyncRequestPending is checked, it will result in - // shardSyncRequestPending being set to true, but no pending futures to trigger the next - // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the - // previous task is in this completion stage, checkAndSubmitNextTask is not invoked - // until this completionStage exits. - try { - lock.lock(); - if (shardSyncRequestPending.get()) { - shardSyncRequestPending.set(false); - // reset future to null, so next call creates a new one - // without trying to get results from the old future. - future = null; - checkAndSubmitNextTask(latestShards); - } - } finally { - lock.unlock(); - } - }); + .whenComplete((taskResult, exception) -> handlePendingShardSyncs(latestShards, exception)); if (LOG.isDebugEnabled()) { LOG.debug("Submitted new " + currentTask.getTaskType() + " task."); } @@ -154,4 +131,29 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; } return submittedTaskFuture; } + + private void handlePendingShardSyncs(List latestShards, Throwable exception) { + if (exception != null) { + LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception); + } + // Acquire lock here. If shardSyncRequestPending is false in this completionStage and + // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) + // but right after the value of shardSyncRequestPending is checked, it will result in + // shardSyncRequestPending being set to true, but no pending futures to trigger the next + // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the + // previous task is in this completion stage, checkAndSubmitNextTask is not invoked + // until this completionStage exits. + try { + lock.lock(); + if (shardSyncRequestPending.get()) { + shardSyncRequestPending.set(false); + // reset future to null, so next call creates a new one + // without trying to get results from the old future. + future = null; + checkAndSubmitNextTask(latestShards); + } + } finally { + lock.unlock(); + } + } } \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java index 5801f48f..b0d6a85b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ShardSyncTaskManagerTest { - private static final long SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS = 100; private static final InitialPositionInStreamExtended INITIAL_POSITION_IN_STREAM_EXTENDED = InitialPositionInStreamExtended .newInitialPosition(InitialPositionInStream.TRIM_HORIZON); private static final boolean CLEANUP_LEASES_SHARD_COMPLETION = Boolean.TRUE; @@ -42,15 +41,13 @@ public class ShardSyncTaskManagerTest { @Mock private IMetricsScope mockMetricsScope; private ShardSyncTaskManager shardSyncTaskManager; - private ShardSyncer pausableNoOpShardSyncer; + private PausableNoOpShardSyncer pausableNoOpShardSyncer; private ShardSyncer mockShardSyncer; - private CountDownLatch countDownLatch; - @Before public void setup() { + @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); when(mockMetricsFactory.createMetrics()).thenReturn(mockMetricsScope); - countDownLatch = new CountDownLatch(1); - pausableNoOpShardSyncer = new PausableNoOpShardSyncer(countDownLatch); + pausableNoOpShardSyncer = new PausableNoOpShardSyncer(); mockShardSyncer = mock(ShardSyncer.class, delegatesTo(pausableNoOpShardSyncer)); shardSyncTaskManager = new ShardSyncTaskManager(mockKinesisProxy, mockLeaseManager, INITIAL_POSITION_IN_STREAM_EXTENDED, CLEANUP_LEASES_SHARD_COMPLETION, IGNORE_UNEXPECTED_CHILD_SHARDS, @@ -59,7 +56,8 @@ public class ShardSyncTaskManagerTest { @Test public void testShardSyncIdempotency() throws Exception { shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); - Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. + // Wait for ShardSyncer to be initialized. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); verify(mockShardSyncer, times(1)) .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), anyBoolean(), Matchers.any()); @@ -68,15 +66,18 @@ public class ShardSyncTaskManagerTest { while (count++ < 5) { shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); } - // Since countDownLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations. + // Since blockShardSyncLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations. verify(mockShardSyncer, times(1)) .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), anyBoolean(), Matchers.any()); + // countdown and exit. + pausableNoOpShardSyncer.blockShardSyncLatch.countDown(); } @Test public void testShardSyncRerunsForPendingRequests() throws Exception { shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); - Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. + // Wait for ShardSyncer to be initialized. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); verify(mockShardSyncer, times(1)) .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), anyBoolean(), Matchers.any()); @@ -85,8 +86,11 @@ public class ShardSyncTaskManagerTest { while (count++ < 5) { shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); } - countDownLatch.countDown(); // Will unblock pending shardSync and a new ShardSync should be triggered. - Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch = new CountDownLatch(1); + // unblock pending shardSync so a new ShardSync is triggered. + pausableNoOpShardSyncer.blockShardSyncLatch.countDown(); + // Wait for ShardSyncer to be initialized. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); // There should be 1 more shardSyncer invocation after the previous shardSync completes. verify(mockShardSyncer, times(2)) .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), @@ -95,10 +99,12 @@ public class ShardSyncTaskManagerTest { private static class PausableNoOpShardSyncer implements ShardSyncer { - private final CountDownLatch countDownLatch; + private CountDownLatch blockShardSyncLatch; + private CountDownLatch waitForShardSyncerInitializationLatch; - PausableNoOpShardSyncer(CountDownLatch countDownLatch) { - this.countDownLatch = countDownLatch; + PausableNoOpShardSyncer() { + this.blockShardSyncLatch = new CountDownLatch(1); + this.waitForShardSyncerInitializationLatch = new CountDownLatch(1); } @Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, @@ -107,7 +113,8 @@ public class ShardSyncTaskManagerTest { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { try { - countDownLatch.await(); + waitForShardSyncerInitializationLatch.countDown(); + blockShardSyncLatch.await(); } catch (InterruptedException e) { // No-OP } @@ -122,5 +129,4 @@ public class ShardSyncTaskManagerTest { cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); } } - }