From 2d6b92e8ac21b03a453eba5ec55f5494d35355fe Mon Sep 17 00:00:00 2001 From: parijas Date: Mon, 2 Dec 2019 16:05:58 -0800 Subject: [PATCH 1/2] Ensure ShardSyncTask invocation from ShardSyncTaskManager for pending shard sync requests --- .../lib/worker/ShardSyncTaskManager.java | 94 ++++++++----- .../lib/worker/ShardSyncTaskManagerTest.java | 126 ++++++++++++++++++ 2 files changed, 187 insertions(+), 33 deletions(-) create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index 0fabbbcd..ba5aabd1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -15,10 +15,12 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.List; -import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import com.amazonaws.services.kinesis.model.Shard; import lombok.Getter; @@ -35,13 +37,12 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; * Kinesis shards, remove obsolete leases). We'll have at most one outstanding sync task at any time. * Worker will use this class to kick off a sync task when it finds shards which have been completely processed. */ -@Getter -class ShardSyncTaskManager { +@Getter class ShardSyncTaskManager { private static final Log LOG = LogFactory.getLog(ShardSyncTaskManager.class); private ITask currentTask; - private Future future; + private CompletableFuture future; private final IKinesisProxy kinesisProxy; private final ILeaseManager leaseManager; private final IMetricsFactory metricsFactory; @@ -51,30 +52,28 @@ class ShardSyncTaskManager { private boolean ignoreUnexpectedChildShards; private final long shardSyncIdleTimeMillis; private final ShardSyncer shardSyncer; + private final ReentrantLock lock; + private AtomicBoolean shardSyncRequestPending; /** * Constructor. * - * @param kinesisProxy Proxy used to fetch streamInfo (shards) - * @param leaseManager Lease manager (used to list and create leases for shards) - * @param initialPositionInStream Initial position in stream + * @param kinesisProxy Proxy used to fetch streamInfo (shards) + * @param leaseManager Lease manager (used to list and create leases for shards) + * @param initialPositionInStream Initial position in stream * @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait - * until they expire) - * @param ignoreUnexpectedChildShards Ignore child shards with open parents - * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards - * @param metricsFactory Metrics factory - * @param executorService ExecutorService to execute the shard sync tasks - * @param shardSyncer shardSyncer instance used to check and create new leases + * until they expire) + * @param ignoreUnexpectedChildShards Ignore child shards with open parents + * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards + * @param metricsFactory Metrics factory + * @param executorService ExecutorService to execute the shard sync tasks + * @param shardSyncer shardSyncer instance used to check and create new leases */ - ShardSyncTaskManager(final IKinesisProxy kinesisProxy, - final ILeaseManager leaseManager, + ShardSyncTaskManager(final IKinesisProxy kinesisProxy, final ILeaseManager leaseManager, final InitialPositionInStreamExtended initialPositionInStream, - final boolean cleanupLeasesUponShardCompletion, - final boolean ignoreUnexpectedChildShards, - final long shardSyncIdleTimeMillis, - final IMetricsFactory metricsFactory, - ExecutorService executorService, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIdleTimeMillis, final IMetricsFactory metricsFactory, ExecutorService executorService, ShardSyncer shardSyncer) { this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; @@ -85,13 +84,20 @@ class ShardSyncTaskManager { this.executorService = executorService; this.initialPositionInStream = initialPositionInStream; this.shardSyncer = shardSyncer; + this.shardSyncRequestPending = new AtomicBoolean(false); + this.lock = new ReentrantLock(Boolean.TRUE); } - synchronized Future syncShardAndLeaseInfo(List latestShards) { - return checkAndSubmitNextTask(latestShards); + Future syncShardAndLeaseInfo(List latestShards) { + try { + lock.lock(); + return checkAndSubmitNextTask(latestShards); + } finally { + lock.unlock(); + } } - private synchronized Future checkAndSubmitNextTask(List latestShards) { + private Future checkAndSubmitNextTask(List latestShards) { Future submittedTaskFuture = null; if ((future == null) || future.isCancelled() || future.isDone()) { if ((future != null) && future.isDone()) { @@ -106,24 +112,46 @@ class ShardSyncTaskManager { } } - currentTask = - new MetricsCollectingTaskDecorator(new ShardSyncTask(kinesisProxy, - leaseManager, - initialPositionInStream, - cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, - shardSyncIdleTimeMillis, + currentTask = new MetricsCollectingTaskDecorator( + new ShardSyncTask(kinesisProxy, leaseManager, initialPositionInStream, + cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, shardSyncer, latestShards), metricsFactory); - future = executorService.submit(currentTask); + future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) + .whenComplete((taskResult, exception) -> { + if (exception != null) { + LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception); + } + // Acquire lock here. If shardSyncRequestPending is false in this completionStage and + // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) + // but right after the value of shardSyncRequestPending is checked, it will result in + // shardSyncRequestPending being set to true, but no pending futures to trigger the next + // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the + // previous task is in this completion stage, checkAndSubmitNextTask is not invoked + // until this completionStage exits. + try { + lock.lock(); + if (shardSyncRequestPending.get()) { + shardSyncRequestPending.set(false); + // reset future to null, so next call creates a new one + // without trying to get results from the old future. + future = null; + checkAndSubmitNextTask(latestShards); + } + } finally { + lock.unlock(); + } + }); if (LOG.isDebugEnabled()) { LOG.debug("Submitted new " + currentTask.getTaskType() + " task."); } submittedTaskFuture = future; } else { if (LOG.isDebugEnabled()) { - LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task."); + LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task. " + + "Enqueued a request that will be executed when the current request completes."); } + shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/); } return submittedTaskFuture; } -} +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java new file mode 100644 index 00000000..5801f48f --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java @@ -0,0 +1,126 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; +import com.amazonaws.services.kinesis.model.Shard; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; + +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ShardSyncTaskManagerTest { + private static final long SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS = 100; + private static final InitialPositionInStreamExtended INITIAL_POSITION_IN_STREAM_EXTENDED = InitialPositionInStreamExtended + .newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final boolean CLEANUP_LEASES_SHARD_COMPLETION = Boolean.TRUE; + private static final boolean IGNORE_UNEXPECTED_CHILD_SHARDS = Boolean.TRUE; + private static final long SHARD_SYNC_IDLE_TIME_MILLIS = 0; + + @Mock private IKinesisProxy mockKinesisProxy; + @Mock private ILeaseManager mockLeaseManager; + @Mock private IMetricsFactory mockMetricsFactory; + @Mock private IMetricsScope mockMetricsScope; + + private ShardSyncTaskManager shardSyncTaskManager; + private ShardSyncer pausableNoOpShardSyncer; + private ShardSyncer mockShardSyncer; + private CountDownLatch countDownLatch; + + @Before public void setup() { + MockitoAnnotations.initMocks(this); + when(mockMetricsFactory.createMetrics()).thenReturn(mockMetricsScope); + countDownLatch = new CountDownLatch(1); + pausableNoOpShardSyncer = new PausableNoOpShardSyncer(countDownLatch); + mockShardSyncer = mock(ShardSyncer.class, delegatesTo(pausableNoOpShardSyncer)); + shardSyncTaskManager = new ShardSyncTaskManager(mockKinesisProxy, mockLeaseManager, + INITIAL_POSITION_IN_STREAM_EXTENDED, CLEANUP_LEASES_SHARD_COMPLETION, IGNORE_UNEXPECTED_CHILD_SHARDS, + SHARD_SYNC_IDLE_TIME_MILLIS, mockMetricsFactory, Executors.newSingleThreadExecutor(), mockShardSyncer); + } + + @Test public void testShardSyncIdempotency() throws Exception { + shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); + Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. + verify(mockShardSyncer, times(1)) + .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), + anyBoolean(), Matchers.any()); + // Invoke a few more times. This would flip shardSyncRequestPending to true in ShardSyncTaskManager. + int count = 0; + while (count++ < 5) { + shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); + } + // Since countDownLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations. + verify(mockShardSyncer, times(1)) + .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), + anyBoolean(), Matchers.any()); + } + + @Test public void testShardSyncRerunsForPendingRequests() throws Exception { + shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); + Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. + verify(mockShardSyncer, times(1)) + .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), + anyBoolean(), Matchers.any()); + // Invoke a few more times. This would flip shardSyncRequestPending to true in ShardSyncTaskManager. + int count = 0; + while (count++ < 5) { + shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); + } + countDownLatch.countDown(); // Will unblock pending shardSync and a new ShardSync should be triggered. + Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. + // There should be 1 more shardSyncer invocation after the previous shardSync completes. + verify(mockShardSyncer, times(2)) + .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), + anyBoolean(), Matchers.any()); + } + + private static class PausableNoOpShardSyncer implements ShardSyncer { + + private final CountDownLatch countDownLatch; + + PausableNoOpShardSyncer(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + // No-OP + } + } + + @Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + this.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); + } + } + +} From 5b3078f8010af3e52731b29b84c0e9c5353cc4c7 Mon Sep 17 00:00:00 2001 From: parijas Date: Tue, 17 Dec 2019 11:40:20 -0800 Subject: [PATCH 2/2] Addressed review comments --- .../lib/worker/ShardSyncTaskManager.java | 52 ++++++++++--------- .../lib/worker/ShardSyncTaskManagerTest.java | 38 ++++++++------ 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index ba5aabd1..da238e4b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -85,7 +85,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; this.initialPositionInStream = initialPositionInStream; this.shardSyncer = shardSyncer; this.shardSyncRequestPending = new AtomicBoolean(false); - this.lock = new ReentrantLock(Boolean.TRUE); + this.lock = new ReentrantLock(); } Future syncShardAndLeaseInfo(List latestShards) { @@ -117,30 +117,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, shardSyncer, latestShards), metricsFactory); future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) - .whenComplete((taskResult, exception) -> { - if (exception != null) { - LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception); - } - // Acquire lock here. If shardSyncRequestPending is false in this completionStage and - // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) - // but right after the value of shardSyncRequestPending is checked, it will result in - // shardSyncRequestPending being set to true, but no pending futures to trigger the next - // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the - // previous task is in this completion stage, checkAndSubmitNextTask is not invoked - // until this completionStage exits. - try { - lock.lock(); - if (shardSyncRequestPending.get()) { - shardSyncRequestPending.set(false); - // reset future to null, so next call creates a new one - // without trying to get results from the old future. - future = null; - checkAndSubmitNextTask(latestShards); - } - } finally { - lock.unlock(); - } - }); + .whenComplete((taskResult, exception) -> handlePendingShardSyncs(latestShards, exception)); if (LOG.isDebugEnabled()) { LOG.debug("Submitted new " + currentTask.getTaskType() + " task."); } @@ -154,4 +131,29 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; } return submittedTaskFuture; } + + private void handlePendingShardSyncs(List latestShards, Throwable exception) { + if (exception != null) { + LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception); + } + // Acquire lock here. If shardSyncRequestPending is false in this completionStage and + // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) + // but right after the value of shardSyncRequestPending is checked, it will result in + // shardSyncRequestPending being set to true, but no pending futures to trigger the next + // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the + // previous task is in this completion stage, checkAndSubmitNextTask is not invoked + // until this completionStage exits. + try { + lock.lock(); + if (shardSyncRequestPending.get()) { + shardSyncRequestPending.set(false); + // reset future to null, so next call creates a new one + // without trying to get results from the old future. + future = null; + checkAndSubmitNextTask(latestShards); + } + } finally { + lock.unlock(); + } + } } \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java index 5801f48f..b0d6a85b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ShardSyncTaskManagerTest { - private static final long SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS = 100; private static final InitialPositionInStreamExtended INITIAL_POSITION_IN_STREAM_EXTENDED = InitialPositionInStreamExtended .newInitialPosition(InitialPositionInStream.TRIM_HORIZON); private static final boolean CLEANUP_LEASES_SHARD_COMPLETION = Boolean.TRUE; @@ -42,15 +41,13 @@ public class ShardSyncTaskManagerTest { @Mock private IMetricsScope mockMetricsScope; private ShardSyncTaskManager shardSyncTaskManager; - private ShardSyncer pausableNoOpShardSyncer; + private PausableNoOpShardSyncer pausableNoOpShardSyncer; private ShardSyncer mockShardSyncer; - private CountDownLatch countDownLatch; - @Before public void setup() { + @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); when(mockMetricsFactory.createMetrics()).thenReturn(mockMetricsScope); - countDownLatch = new CountDownLatch(1); - pausableNoOpShardSyncer = new PausableNoOpShardSyncer(countDownLatch); + pausableNoOpShardSyncer = new PausableNoOpShardSyncer(); mockShardSyncer = mock(ShardSyncer.class, delegatesTo(pausableNoOpShardSyncer)); shardSyncTaskManager = new ShardSyncTaskManager(mockKinesisProxy, mockLeaseManager, INITIAL_POSITION_IN_STREAM_EXTENDED, CLEANUP_LEASES_SHARD_COMPLETION, IGNORE_UNEXPECTED_CHILD_SHARDS, @@ -59,7 +56,8 @@ public class ShardSyncTaskManagerTest { @Test public void testShardSyncIdempotency() throws Exception { shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); - Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. + // Wait for ShardSyncer to be initialized. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); verify(mockShardSyncer, times(1)) .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), anyBoolean(), Matchers.any()); @@ -68,15 +66,18 @@ public class ShardSyncTaskManagerTest { while (count++ < 5) { shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); } - // Since countDownLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations. + // Since blockShardSyncLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations. verify(mockShardSyncer, times(1)) .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), anyBoolean(), Matchers.any()); + // countdown and exit. + pausableNoOpShardSyncer.blockShardSyncLatch.countDown(); } @Test public void testShardSyncRerunsForPendingRequests() throws Exception { shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); - Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. + // Wait for ShardSyncer to be initialized. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); verify(mockShardSyncer, times(1)) .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), anyBoolean(), Matchers.any()); @@ -85,8 +86,11 @@ public class ShardSyncTaskManagerTest { while (count++ < 5) { shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); } - countDownLatch.countDown(); // Will unblock pending shardSync and a new ShardSync should be triggered. - Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch = new CountDownLatch(1); + // unblock pending shardSync so a new ShardSync is triggered. + pausableNoOpShardSyncer.blockShardSyncLatch.countDown(); + // Wait for ShardSyncer to be initialized. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); // There should be 1 more shardSyncer invocation after the previous shardSync completes. verify(mockShardSyncer, times(2)) .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), @@ -95,10 +99,12 @@ public class ShardSyncTaskManagerTest { private static class PausableNoOpShardSyncer implements ShardSyncer { - private final CountDownLatch countDownLatch; + private CountDownLatch blockShardSyncLatch; + private CountDownLatch waitForShardSyncerInitializationLatch; - PausableNoOpShardSyncer(CountDownLatch countDownLatch) { - this.countDownLatch = countDownLatch; + PausableNoOpShardSyncer() { + this.blockShardSyncLatch = new CountDownLatch(1); + this.waitForShardSyncerInitializationLatch = new CountDownLatch(1); } @Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, @@ -107,7 +113,8 @@ public class ShardSyncTaskManagerTest { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { try { - countDownLatch.await(); + waitForShardSyncerInitializationLatch.countDown(); + blockShardSyncLatch.await(); } catch (InterruptedException e) { // No-OP } @@ -122,5 +129,4 @@ public class ShardSyncTaskManagerTest { cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); } } - }