diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index 0fabbbcd..da238e4b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -15,10 +15,12 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.List; -import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import com.amazonaws.services.kinesis.model.Shard; import lombok.Getter; @@ -35,13 +37,12 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; * Kinesis shards, remove obsolete leases). We'll have at most one outstanding sync task at any time. * Worker will use this class to kick off a sync task when it finds shards which have been completely processed. */ -@Getter -class ShardSyncTaskManager { +@Getter class ShardSyncTaskManager { private static final Log LOG = LogFactory.getLog(ShardSyncTaskManager.class); private ITask currentTask; - private Future future; + private CompletableFuture future; private final IKinesisProxy kinesisProxy; private final ILeaseManager leaseManager; private final IMetricsFactory metricsFactory; @@ -51,30 +52,28 @@ class ShardSyncTaskManager { private boolean ignoreUnexpectedChildShards; private final long shardSyncIdleTimeMillis; private final ShardSyncer shardSyncer; + private final ReentrantLock lock; + private AtomicBoolean shardSyncRequestPending; /** * Constructor. * - * @param kinesisProxy Proxy used to fetch streamInfo (shards) - * @param leaseManager Lease manager (used to list and create leases for shards) - * @param initialPositionInStream Initial position in stream + * @param kinesisProxy Proxy used to fetch streamInfo (shards) + * @param leaseManager Lease manager (used to list and create leases for shards) + * @param initialPositionInStream Initial position in stream * @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait - * until they expire) - * @param ignoreUnexpectedChildShards Ignore child shards with open parents - * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards - * @param metricsFactory Metrics factory - * @param executorService ExecutorService to execute the shard sync tasks - * @param shardSyncer shardSyncer instance used to check and create new leases + * until they expire) + * @param ignoreUnexpectedChildShards Ignore child shards with open parents + * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards + * @param metricsFactory Metrics factory + * @param executorService ExecutorService to execute the shard sync tasks + * @param shardSyncer shardSyncer instance used to check and create new leases */ - ShardSyncTaskManager(final IKinesisProxy kinesisProxy, - final ILeaseManager leaseManager, + ShardSyncTaskManager(final IKinesisProxy kinesisProxy, final ILeaseManager leaseManager, final InitialPositionInStreamExtended initialPositionInStream, - final boolean cleanupLeasesUponShardCompletion, - final boolean ignoreUnexpectedChildShards, - final long shardSyncIdleTimeMillis, - final IMetricsFactory metricsFactory, - ExecutorService executorService, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIdleTimeMillis, final IMetricsFactory metricsFactory, ExecutorService executorService, ShardSyncer shardSyncer) { this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; @@ -85,13 +84,20 @@ class ShardSyncTaskManager { this.executorService = executorService; this.initialPositionInStream = initialPositionInStream; this.shardSyncer = shardSyncer; + this.shardSyncRequestPending = new AtomicBoolean(false); + this.lock = new ReentrantLock(); } - synchronized Future syncShardAndLeaseInfo(List latestShards) { - return checkAndSubmitNextTask(latestShards); + Future syncShardAndLeaseInfo(List latestShards) { + try { + lock.lock(); + return checkAndSubmitNextTask(latestShards); + } finally { + lock.unlock(); + } } - private synchronized Future checkAndSubmitNextTask(List latestShards) { + private Future checkAndSubmitNextTask(List latestShards) { Future submittedTaskFuture = null; if ((future == null) || future.isCancelled() || future.isDone()) { if ((future != null) && future.isDone()) { @@ -106,24 +112,48 @@ class ShardSyncTaskManager { } } - currentTask = - new MetricsCollectingTaskDecorator(new ShardSyncTask(kinesisProxy, - leaseManager, - initialPositionInStream, - cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, - shardSyncIdleTimeMillis, + currentTask = new MetricsCollectingTaskDecorator( + new ShardSyncTask(kinesisProxy, leaseManager, initialPositionInStream, + cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, shardSyncer, latestShards), metricsFactory); - future = executorService.submit(currentTask); + future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) + .whenComplete((taskResult, exception) -> handlePendingShardSyncs(latestShards, exception)); if (LOG.isDebugEnabled()) { LOG.debug("Submitted new " + currentTask.getTaskType() + " task."); } submittedTaskFuture = future; } else { if (LOG.isDebugEnabled()) { - LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task."); + LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task. " + + "Enqueued a request that will be executed when the current request completes."); } + shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/); } return submittedTaskFuture; } -} + + private void handlePendingShardSyncs(List latestShards, Throwable exception) { + if (exception != null) { + LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception); + } + // Acquire lock here. If shardSyncRequestPending is false in this completionStage and + // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) + // but right after the value of shardSyncRequestPending is checked, it will result in + // shardSyncRequestPending being set to true, but no pending futures to trigger the next + // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the + // previous task is in this completion stage, checkAndSubmitNextTask is not invoked + // until this completionStage exits. + try { + lock.lock(); + if (shardSyncRequestPending.get()) { + shardSyncRequestPending.set(false); + // reset future to null, so next call creates a new one + // without trying to get results from the old future. + future = null; + checkAndSubmitNextTask(latestShards); + } + } finally { + lock.unlock(); + } + } +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java new file mode 100644 index 00000000..b0d6a85b --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManagerTest.java @@ -0,0 +1,132 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; +import com.amazonaws.services.kinesis.model.Shard; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; + +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ShardSyncTaskManagerTest { + private static final InitialPositionInStreamExtended INITIAL_POSITION_IN_STREAM_EXTENDED = InitialPositionInStreamExtended + .newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final boolean CLEANUP_LEASES_SHARD_COMPLETION = Boolean.TRUE; + private static final boolean IGNORE_UNEXPECTED_CHILD_SHARDS = Boolean.TRUE; + private static final long SHARD_SYNC_IDLE_TIME_MILLIS = 0; + + @Mock private IKinesisProxy mockKinesisProxy; + @Mock private ILeaseManager mockLeaseManager; + @Mock private IMetricsFactory mockMetricsFactory; + @Mock private IMetricsScope mockMetricsScope; + + private ShardSyncTaskManager shardSyncTaskManager; + private PausableNoOpShardSyncer pausableNoOpShardSyncer; + private ShardSyncer mockShardSyncer; + + @Before public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + when(mockMetricsFactory.createMetrics()).thenReturn(mockMetricsScope); + pausableNoOpShardSyncer = new PausableNoOpShardSyncer(); + mockShardSyncer = mock(ShardSyncer.class, delegatesTo(pausableNoOpShardSyncer)); + shardSyncTaskManager = new ShardSyncTaskManager(mockKinesisProxy, mockLeaseManager, + INITIAL_POSITION_IN_STREAM_EXTENDED, CLEANUP_LEASES_SHARD_COMPLETION, IGNORE_UNEXPECTED_CHILD_SHARDS, + SHARD_SYNC_IDLE_TIME_MILLIS, mockMetricsFactory, Executors.newSingleThreadExecutor(), mockShardSyncer); + } + + @Test public void testShardSyncIdempotency() throws Exception { + shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); + // Wait for ShardSyncer to be initialized. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); + verify(mockShardSyncer, times(1)) + .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), + anyBoolean(), Matchers.any()); + // Invoke a few more times. This would flip shardSyncRequestPending to true in ShardSyncTaskManager. + int count = 0; + while (count++ < 5) { + shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); + } + // Since blockShardSyncLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations. + verify(mockShardSyncer, times(1)) + .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), + anyBoolean(), Matchers.any()); + // countdown and exit. + pausableNoOpShardSyncer.blockShardSyncLatch.countDown(); + } + + @Test public void testShardSyncRerunsForPendingRequests() throws Exception { + shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); + // Wait for ShardSyncer to be initialized. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); + verify(mockShardSyncer, times(1)) + .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), + anyBoolean(), Matchers.any()); + // Invoke a few more times. This would flip shardSyncRequestPending to true in ShardSyncTaskManager. + int count = 0; + while (count++ < 5) { + shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); + } + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch = new CountDownLatch(1); + // unblock pending shardSync so a new ShardSync is triggered. + pausableNoOpShardSyncer.blockShardSyncLatch.countDown(); + // Wait for ShardSyncer to be initialized. + pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); + // There should be 1 more shardSyncer invocation after the previous shardSync completes. + verify(mockShardSyncer, times(2)) + .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), + anyBoolean(), Matchers.any()); + } + + private static class PausableNoOpShardSyncer implements ShardSyncer { + + private CountDownLatch blockShardSyncLatch; + private CountDownLatch waitForShardSyncerInitializationLatch; + + PausableNoOpShardSyncer() { + this.blockShardSyncLatch = new CountDownLatch(1); + this.waitForShardSyncerInitializationLatch = new CountDownLatch(1); + } + + @Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + try { + waitForShardSyncerInitializationLatch.countDown(); + blockShardSyncLatch.await(); + } catch (InterruptedException e) { + // No-OP + } + } + + @Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + this.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); + } + } +}