diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 3ca940ee..47b1239f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -528,7 +528,7 @@ class ConsumerStates { consumer.getStreamConfig().getInitialPositionInStream(), consumer.isCleanupLeasesOfCompletedShards(), consumer.isIgnoreUnexpectedChildShards(), - consumer.getLeaseManager(), + consumer.getLeaseCoordinator(), consumer.getTaskBackoffTimeMillis(), consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy()); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index 5d8d0f82..c23fd678 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.lang3.StringUtils; @@ -79,14 +80,38 @@ class KinesisShardSyncer implements ShardSyncer { * @throws ProvisionedThroughputException * @throws KinesisClientLibIOException */ - public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, - ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) + @Override + public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); } + /** + * Check and create leases for any new shards (e.g. following a reshard operation). + * + * @param kinesisProxy + * @param leaseManager + * @param initialPositionInStream + * @param cleanupLeasesOfCompletedShards + * @param ignoreUnexpectedChildShards + * @param latestShards latest snapshot of shards to reuse + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws KinesisClientLibIOException + */ + @Override + public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards); + } + /** * Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard). * @@ -100,14 +125,42 @@ class KinesisShardSyncer implements ShardSyncer { * @throws ProvisionedThroughputException * @throws KinesisClientLibIOException */ - // CHECKSTYLE:OFF CyclomaticComplexity private synchronized void syncShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition, boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - List shards = getShardList(kinesisProxy); - LOG.debug("Num shards: " + shards.size()); + List latestShards = getShardList(kinesisProxy); + syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards); + } + + /** + * Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard). + * + * @param kinesisProxy + * @param leaseManager + * @param initialPosition + * @param cleanupLeasesOfCompletedShards + * @param ignoreUnexpectedChildShards + * @param latestShards latest snapshot of shards to reuse + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws KinesisClientLibIOException + */ + // CHECKSTYLE:OFF CyclomaticComplexity + private synchronized void syncShardLeases(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition, + boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + List shards; + if(CollectionUtils.isNullOrEmpty(latestShards)) { + shards = getShardList(kinesisProxy); + } else { + shards = latestShards; + } + LOG.debug("Num Shards: " + shards.size()); Map shardIdToShardMap = constructShardIdToShardMap(shards); Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java index c39803ae..c85fbbef 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java @@ -1,5 +1,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + /** * An implementation of ShardSyncStrategy. */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 0b81f23d..394f9486 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -50,7 +50,7 @@ class ShardConsumer { private final ShardInfo shardInfo; private final KinesisDataFetcher dataFetcher; private final IMetricsFactory metricsFactory; - private final ILeaseManager leaseManager; + private final KinesisClientLibLeaseCoordinator leaseCoordinator; private ICheckpoint checkpoint; // Backoff time when polling to check if application has finished processing parent shards private final long parentShardPollIntervalMillis; @@ -98,7 +98,7 @@ class ShardConsumer { * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard * @param config Kinesis library configuration - * @param leaseManager Used to create leases for new shards + * @param leaseCoordinator Used to manage leases for current worker * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard @@ -110,7 +110,7 @@ class ShardConsumer { StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, - ILeaseManager leaseManager, + KinesisClientLibLeaseCoordinator leaseCoordinator, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, @@ -122,7 +122,7 @@ class ShardConsumer { streamConfig, checkpoint, recordProcessor, - leaseManager, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -139,7 +139,7 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard - * @param leaseManager Used to create leases for new shards + * @param leaseCoordinator Used to manage leases for current worker * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard @@ -154,7 +154,7 @@ class ShardConsumer { StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, - ILeaseManager leaseManager, + KinesisClientLibLeaseCoordinator leaseCoordinator, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, @@ -177,7 +177,7 @@ class ShardConsumer { shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), metricsFactory), - leaseManager, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -197,7 +197,7 @@ class ShardConsumer { * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard * @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress - * @param leaseManager Used to create leases for new shards + * @param leaseCoordinator Used to manage leases for current worker * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param cleanupLeasesOfCompletedShards clean up the leases of completed shards * @param executorService ExecutorService used to execute process tasks for this shard @@ -215,7 +215,7 @@ class ShardConsumer { ICheckpoint checkpoint, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, - ILeaseManager leaseManager, + KinesisClientLibLeaseCoordinator leaseCoordinator, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, @@ -231,7 +231,7 @@ class ShardConsumer { this.checkpoint = checkpoint; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; - this.leaseManager = leaseManager; + this.leaseCoordinator = leaseCoordinator; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.executorService = executorService; @@ -478,7 +478,11 @@ class ShardConsumer { } ILeaseManager getLeaseManager() { - return leaseManager; + return leaseCoordinator.getLeaseManager(); + } + + KinesisClientLibLeaseCoordinator getLeaseCoordinator() { + return leaseCoordinator; } ICheckpoint getCheckpoint() { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java index dc620aec..8077efcc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java @@ -1,8 +1,10 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.model.Shard; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,6 +57,12 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy { return onFoundCompletedShard(); } + @Override + public TaskResult onShardConsumerShutDown(List latestShards) { + shardSyncTaskManager.syncShardAndLeaseInfo(latestShards); + return new TaskResult(null); + } + @Override public void onWorkerShutDown() { LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString())); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java index 6738d2e9..0303f188 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java @@ -1,5 +1,9 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.model.Shard; + +import java.util.List; + /** * Facade of methods that can be invoked at different points * in KCL application execution to perform certain actions related to shard-sync. @@ -41,6 +45,16 @@ public interface ShardSyncStrategy { */ TaskResult onShardConsumerShutDown(); + /** + * Invoked when ShardConsumer is shutdown and all shards are provided. + * + * @param latestShards latest snapshot of shards to reuse + * @return + */ + default TaskResult onShardConsumerShutDown(List latestShards) { + return onShardConsumerShutDown(); + } + /** * Invoked when worker is shutdown. */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java index f6e2a87d..13c43b0e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.model.Shard; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -21,6 +22,8 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import java.util.List; + /** * This task syncs leases/activies with shards of the stream. * It will create new leases/activites when it discovers new shards (e.g. setup/resharding). @@ -39,6 +42,7 @@ class ShardSyncTask implements ITask { private final long shardSyncTaskIdleTimeMillis; private final TaskType taskType = TaskType.SHARDSYNC; private final ShardSyncer shardSyncer; + private final List latestShards; /** * @param kinesisProxy Used to fetch information about the stream (e.g. shard list) @@ -50,6 +54,7 @@ class ShardSyncTask implements ITask { * in Kinesis) * @param shardSyncTaskIdleTimeMillis shardSync task idle time in millis * @param shardSyncer shardSyncer instance used to check and create new leases + * @param latestShards latest snapshot of shards to reuse */ ShardSyncTask(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, @@ -57,7 +62,8 @@ class ShardSyncTask implements ITask { boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, long shardSyncTaskIdleTimeMillis, - ShardSyncer shardSyncer) { + ShardSyncer shardSyncer, List latestShards) { + this.latestShards = latestShards; this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; this.initialPosition = initialPositionInStream; @@ -79,7 +85,8 @@ class ShardSyncTask implements ITask { leaseManager, initialPosition, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards); + ignoreUnexpectedChildShards, + latestShards); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index 9601cf64..0fabbbcd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -14,11 +14,13 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.model.Shard; import lombok.Getter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -85,11 +87,11 @@ class ShardSyncTaskManager { this.shardSyncer = shardSyncer; } - synchronized Future syncShardAndLeaseInfo(Set closedShardIds) { - return checkAndSubmitNextTask(closedShardIds); + synchronized Future syncShardAndLeaseInfo(List latestShards) { + return checkAndSubmitNextTask(latestShards); } - private synchronized Future checkAndSubmitNextTask(Set closedShardIds) { + private synchronized Future checkAndSubmitNextTask(List latestShards) { Future submittedTaskFuture = null; if ((future == null) || future.isCancelled() || future.isDone()) { if ((future != null) && future.isDone()) { @@ -111,7 +113,7 @@ class ShardSyncTaskManager { cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, - shardSyncer), metricsFactory); + shardSyncer, latestShards), metricsFactory); future = executorService.submit(currentTask); if (LOG.isDebugEnabled()) { LOG.debug("Submitted new " + currentTask.getTaskType() + " task."); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 2821dd2d..ca8511ce 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -21,12 +21,23 @@ import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.services.kinesis.model.Shard; + +import java.util.List; public interface ShardSyncer { void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, - InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards) + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException; + + default void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards, List latestShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index f062d745..abbc7bb1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -14,6 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.util.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -22,11 +25,14 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + /** * Task for invoking the RecordProcessor shutdown() callback. */ @@ -41,7 +47,7 @@ class ShutdownTask implements ITask { private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final ShutdownReason reason; private final IKinesisProxy kinesisProxy; - private final ILeaseManager leaseManager; + private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; private final boolean ignoreUnexpectedChildShards; @@ -63,7 +69,7 @@ class ShutdownTask implements ITask { InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, - ILeaseManager leaseManager, + KinesisClientLibLeaseCoordinator leaseCoordinator, long backoffTimeMillis, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this.shardInfo = shardInfo; @@ -74,7 +80,7 @@ class ShutdownTask implements ITask { this.initialPositionInStream = initialPositionInStream; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; - this.leaseManager = leaseManager; + this.leaseCoordinator = leaseCoordinator; this.backoffTimeMillis = backoffTimeMillis; this.getRecordsCache = getRecordsCache; this.shardSyncer = shardSyncer; @@ -93,24 +99,44 @@ class ShutdownTask implements ITask { boolean applicationException = false; try { + ShutdownReason localReason = reason; + List latestShards = null; + /* + * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END + * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active + * workers to contend for the lease of this shard. + */ + if(localReason == ShutdownReason.TERMINATE) { + latestShards = kinesisProxy.getShardList(); + + //If latestShards is null or empty, we should still shut down the ShardConsumer with Zombie state which avoid + // checking point with SHARD_END sequence number. + if(CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { + localReason = ShutdownReason.ZOMBIE; + dropLease(); + LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId()); + } + } + + // If we reached end of the shard, set sequence number to SHARD_END. - if (reason == ShutdownReason.TERMINATE) { + if (localReason == ShutdownReason.TERMINATE) { recordProcessorCheckpointer.setSequenceNumberAtShardEnd( recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); } LOG.debug("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken " - + shardInfo.getConcurrencyToken() + ". Shutdown reason: " + reason); + + shardInfo.getConcurrencyToken() + ". Shutdown reason: " + localReason); final ShutdownInput shutdownInput = new ShutdownInput() - .withShutdownReason(reason) + .withShutdownReason(localReason) .withCheckpointer(recordProcessorCheckpointer); final long recordProcessorStartTimeMillis = System.currentTimeMillis(); try { recordProcessor.shutdown(shutdownInput); ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); - if (reason == ShutdownReason.TERMINATE) { + if (localReason == ShutdownReason.TERMINATE) { if ((lastCheckpointValue == null) || (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " @@ -129,10 +155,10 @@ class ShutdownTask implements ITask { MetricsLevel.SUMMARY); } - if (reason == ShutdownReason.TERMINATE) { + if (localReason == ShutdownReason.TERMINATE) { LOG.debug("Looking for child shards of shard " + shardInfo.getShardId()); // create leases for the child shards - TaskResult result = shardSyncStrategy.onShardConsumerShutDown(); + TaskResult result = shardSyncStrategy.onShardConsumerShutDown(latestShards); if (result.getException() != null) { LOG.debug("Exception while trying to sync shards on the shutdown of shard: " + shardInfo .getShardId()); @@ -175,4 +201,23 @@ class ShutdownTask implements ITask { return reason; } + private boolean isShardInContextParentOfAny(List shards) { + for(Shard shard : shards) { + if (isChildShardOfShardInContext(shard)) { + return true; + } + } + return false; + } + + private boolean isChildShardOfShardInContext(Shard shard) { + return (StringUtils.equals(shard.getParentShardId(), shardInfo.getShardId()) + || StringUtils.equals(shard.getAdjacentParentShardId(), shardInfo.getShardId())); + } + + private void dropLease() { + KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + leaseCoordinator.dropLease(lease); + LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey()); + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 22d74ba8..1a7fa58f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -679,7 +679,7 @@ public class Worker implements Runnable { LOG.info("Syncing Kinesis shard info"); ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, - config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer); + config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer, null); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); @@ -1042,7 +1042,7 @@ public class Worker implements Runnable { streamConfig, checkpointTracker, recordProcessor, - leaseCoordinator.getLeaseManager(), + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, executorService, @@ -1167,7 +1167,7 @@ public class Worker implements Runnable { new ShardSyncTask(kinesisProxy, leaseManager, config.getInitialPositionInStreamExtended(), config.shouldCleanupLeasesUponShardCompletion(), config.shouldIgnoreUnexpectedChildShards(), SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, - shardSyncer), metricsFactory)); + shardSyncer, null), metricsFactory)); } private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 8c85b546..d9160f0f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -35,6 +35,7 @@ import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -66,6 +67,8 @@ public class ConsumerStatesTest { private KinesisDataFetcher dataFetcher; @Mock private ILeaseManager leaseManager; + @InjectMocks + private KinesisClientLibLeaseCoordinator leaseCoordinator = new KinesisClientLibLeaseCoordinator(leaseManager, "testCoordinator", 1000, 1000); @Mock private ICheckpoint checkpoint; @Mock @@ -93,6 +96,7 @@ public class ConsumerStatesTest { when(consumer.getShardInfo()).thenReturn(shardInfo); when(consumer.getDataFetcher()).thenReturn(dataFetcher); when(consumer.getLeaseManager()).thenReturn(leaseManager); + when(consumer.getLeaseCoordinator()).thenReturn(leaseCoordinator); when(consumer.getCheckpoint()).thenReturn(checkpoint); when(consumer.getFuture()).thenReturn(future); when(consumer.getShutdownNotification()).thenReturn(shutdownNotification); @@ -294,7 +298,7 @@ public class ConsumerStatesTest { equalTo(recordProcessorCheckpointer))); assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason))); assertThat(task, shutdownTask(IKinesisProxy.class, "kinesisProxy", equalTo(kinesisProxy))); - assertThat(task, shutdownTask(LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager))); + assertThat(task, shutdownTask(KinesisClientLibLeaseCoordinator.class, "leaseCoordinator", equalTo(leaseCoordinator))); assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream", equalTo(initialPositionInStream))); assertThat(task, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index ff7aef75..f50069d3 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -58,6 +58,7 @@ import org.hamcrest.TypeSafeMatcher; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -117,6 +118,8 @@ public class ShardConsumerTest { @Mock private ILeaseManager leaseManager; @Mock + private KinesisClientLibLeaseCoordinator leaseCoordinator; + @Mock private ICheckpoint checkpoint; @Mock private ShutdownNotification shutdownNotification; @@ -145,6 +148,7 @@ public class ShardConsumerTest { when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class); when(leaseManager.getLease(anyString())).thenReturn(null); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); StreamConfig streamConfig = new StreamConfig(streamProxy, 1, @@ -157,7 +161,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - null, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -194,6 +198,7 @@ public class ShardConsumerTest { when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class); when(leaseManager.getLease(anyString())).thenReturn(null); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); StreamConfig streamConfig = new StreamConfig(streamProxy, 1, @@ -206,7 +211,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - null, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, spyExecutorService, @@ -250,7 +255,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - null, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -264,6 +269,7 @@ public class ShardConsumerTest { final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; when(leaseManager.getLease(anyString())).thenReturn(null); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); @@ -332,6 +338,7 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -368,7 +375,7 @@ public class ShardConsumerTest { checkpoint, processor, recordProcessorCheckpointer, - leaseManager, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -445,7 +452,7 @@ public class ShardConsumerTest { @Override public void shutdown(ShutdownInput input) { ShutdownReason reason = input.getShutdownReason(); - if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) { + if ((reason.equals(ShutdownReason.ZOMBIE) || reason.equals(ShutdownReason.TERMINATE)) && errorShutdownLatch.getCount() > 0) { errorShutdownLatch.countDown(); throw new RuntimeException("test"); } else { @@ -456,7 +463,7 @@ public class ShardConsumerTest { /** * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record - * processor's shutdown method with reason terminate will be retried. + * processor's shutdown method with reason zombie will be retried. */ @Test public final void testConsumeShardWithTransientTerminateError() throws Exception { @@ -476,7 +483,7 @@ public class ShardConsumerTest { final int idleTimeMS = 0; // keep unit tests fast ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); - when(leaseManager.getLease(anyString())).thenReturn(null); + TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); @@ -496,6 +503,9 @@ public class ShardConsumerTest { when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); + when(leaseManager.getLease(anyString())).thenReturn(null); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease()); RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( shardInfo, @@ -514,7 +524,7 @@ public class ShardConsumerTest { checkpoint, processor, recordProcessorCheckpointer, - leaseManager, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -528,7 +538,150 @@ public class ShardConsumerTest { shardSyncer, shardSyncStrategy); - when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(null)); + when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null)); + + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + consumer.consumeShard(); // check on parent shards + Thread.sleep(50L); + consumer.consumeShard(); // start initialization + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + consumer.consumeShard(); // initialize + processor.getInitializeLatch().await(5, TimeUnit.SECONDS); + verify(getRecordsCache).start(); + + // We expect to process all records in numRecs calls + for (int i = 0; i < numRecs;) { + boolean newTaskSubmitted = consumer.consumeShard(); + if (newTaskSubmitted) { + LOG.debug("New processing task was submitted, call # " + i); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES + i += maxRecords; + } + Thread.sleep(50L); + } + + // Consume shards until shutdown terminate is called and it has thrown an exception + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.errorShutdownLatch.getCount()); + + // Wait for a retry of shutdown terminate that should succeed + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.getShutdownLatch().getCount()); + + // Wait for shutdown complete now that terminate shutdown is successful + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + break; + } + Thread.sleep(50L); + } + assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + + assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); + + verify(getRecordsCache).shutdown(); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + + String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString()); + List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); + verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); + file.delete(); + } + + + + /** + * Test method for {@link ShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown + * reason TERMINATE when the shard end is reached. + */ + @Test + public final void testConsumeShardWithShardEnd() throws Exception { + int numRecs = 10; + BigInteger startSeqNum = BigInteger.ONE; + String streamShardId = "kinesis-0-0"; + String testConcurrencyToken = "testToken"; + List shardList = KinesisLocalFileDataCreator.createShardList(3, "kinesis-0-", startSeqNum); + // Close the shard so that shutdown is called with reason terminate + shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber( + KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString()); + shardList.get(1).setParentShardId("kinesis-0-0"); + shardList.get(2).setAdjacentParentShardId("kinesis-0-0"); + File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002"); + + IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); + + final int maxRecords = 2; + final int idleTimeMS = 0; // keep unit tests fast + ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); + checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); + when(leaseManager.getLease(anyString())).thenReturn(null); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + + TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); + + StreamConfig streamConfig = + new StreamConfig(fileBasedProxy, + maxRecords, + idleTimeMS, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); + + dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, + new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), + any(IMetricsFactory.class), anyInt())) + .thenReturn(getRecordsCache); + + RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() + ), + metricsFactory + ); + + ShardConsumer consumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + recordProcessorCheckpointer, + leaseCoordinator, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config, + shardSyncer, + shardSyncStrategy); + + when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null)); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -618,6 +771,7 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -655,7 +809,7 @@ public class ShardConsumerTest { checkpoint, processor, recordProcessorCheckpointer, - leaseManager, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -730,7 +884,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - null, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -746,6 +900,7 @@ public class ShardConsumerTest { final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); when(leaseManager.getLease(anyString())).thenReturn(null); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory()); when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); @@ -783,7 +938,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - null, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -815,7 +970,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - null, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, @@ -858,7 +1013,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - null, + leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, mockExecutorService, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index 26a47079..04741f45 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -127,7 +127,8 @@ public class ShardSyncTaskIntegrationTest { false, false, 0L, - shardSyncer); + shardSyncer, + null); syncTask.call(); List leases = leaseManager.listLeases(); Set leaseKeys = new HashSet(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 9c156f58..fd34be76 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -25,6 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,7 +37,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ExceptionThrowingLeaseManager.ExceptionThrowingLeaseManagerMethods; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; @@ -230,7 +232,7 @@ public class ShardSyncerTest { IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false); + cleanupLeasesOfCompletedShards, false, shards); List newLeases = leaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet(); expectedLeaseShardIds.add("shardId-4"); @@ -262,7 +264,7 @@ public class ShardSyncerTest { IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON, - cleanupLeasesOfCompletedShards, false); + cleanupLeasesOfCompletedShards, false, shards); List newLeases = leaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet(); for (int i = 0; i < 11; i++) { @@ -293,7 +295,7 @@ public class ShardSyncerTest { IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP, - cleanupLeasesOfCompletedShards, false); + cleanupLeasesOfCompletedShards, false, shards); List newLeases = leaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet(); for (int i = 0; i < 11; i++) { @@ -327,7 +329,7 @@ public class ShardSyncerTest { IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON, - cleanupLeasesOfCompletedShards, false); + cleanupLeasesOfCompletedShards, false, shards); dataFile.delete(); } @@ -352,7 +354,7 @@ public class ShardSyncerTest { dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, true); + cleanupLeasesOfCompletedShards, true, shards); List newLeases = leaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet(); expectedLeaseShardIds.add("shardId-4"); @@ -467,7 +469,7 @@ public class ShardSyncerTest { exceptionThrowingLeaseManager, position, cleanupLeasesOfCompletedShards, - false); + false, null); return; } catch (LeasingException e) { LOG.debug("Catch leasing exception", e); @@ -480,7 +482,7 @@ public class ShardSyncerTest { leaseManager, position, cleanupLeasesOfCompletedShards, - false); + false, null); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 029a1efe..cd82e475 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -14,16 +14,22 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.model.HashKeyRange; +import com.amazonaws.services.kinesis.model.SequenceNumberRange; +import com.amazonaws.services.kinesis.model.Shard; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -53,7 +59,7 @@ public class ShutdownTaskTest { Set defaultParentShardIds = new HashSet<>(); String defaultConcurrencyToken = "testToken4398"; - String defaultShardId = "shardId-0000397840"; + String defaultShardId = "shardId-0"; ShardInfo defaultShardInfo = new ShardInfo(defaultShardId, defaultConcurrencyToken, defaultParentShardIds, @@ -104,7 +110,11 @@ public class ShutdownTaskTest { RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); + List shards = constructShardListForGraphA(); + when(kinesisProxy.getShardList()).thenReturn(shards); + KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; ShutdownTask task = new ShutdownTask(defaultShardInfo, @@ -115,7 +125,7 @@ public class ShutdownTaskTest { INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - leaseManager, + leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, getRecordsCache, shardSyncer, @@ -132,12 +142,17 @@ public class ShutdownTaskTest { public final void testCallWhenSyncingShardsThrows() { RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); - when(kinesisProxy.getShardList()).thenReturn(null); + when(kinesisProxy.getShardList()).thenReturn(shards); + KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(new KinesisClientLibIOException(""))); + + when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(new KinesisClientLibIOException(""))); ShutdownTask task = new ShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, @@ -146,25 +161,187 @@ public class ShutdownTaskTest { INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - leaseManager, + leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, getRecordsCache, shardSyncer, shardSyncStrategy); TaskResult result = task.call(); - verify(shardSyncStrategy).onShardConsumerShutDown(); + verify(shardSyncStrategy).onShardConsumerShutDown(shards); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); verify(getRecordsCache).shutdown(); } + @Test + public final void testCallWhenShardEnd() { + RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); + when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + List shards = constructShardListForGraphA(); + IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); + when(kinesisProxy.getShardList()).thenReturn(shards); + KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); + ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; + + when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null)); + ShutdownTask task = new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy); + TaskResult result = task.call(); + verify(shardSyncStrategy).onShardConsumerShutDown(shards); + verify(kinesisProxy, times(1)).getShardList(); + Assert.assertNull(result.getException()); + verify(getRecordsCache).shutdown(); + verify(leaseCoordinator, never()).dropLease(any()); + } + + @Test + public final void testCallWhenFalseShardEnd() { + ShardInfo shardInfo = new ShardInfo("shardId-4", + defaultConcurrencyToken, + defaultParentShardIds, + ExtendedSequenceNumber.LATEST); + RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); + when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + List shards = constructShardListForGraphA(); + IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); + when(kinesisProxy.getShardList()).thenReturn(shards); + KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); + ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease()); + boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; + + when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null)); + + ShutdownTask task = new ShutdownTask(shardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy); + TaskResult result = task.call(); + verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards); + verify(kinesisProxy, times(1)).getShardList(); + Assert.assertNull(result.getException()); + verify(getRecordsCache).shutdown(); + verify(leaseCoordinator).dropLease(any()); + } + + @Test + public final void testCallWhenLeaseLost() { + RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); + when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); + List shards = constructShardListForGraphA(); + IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); + when(kinesisProxy.getShardList()).thenReturn(shards); + KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); + ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; + + when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null)); + ShutdownTask task = new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.ZOMBIE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy); + TaskResult result = task.call(); + verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards); + verify(kinesisProxy, never()).getShardList(); + Assert.assertNull(result.getException()); + verify(getRecordsCache).shutdown(); + verify(leaseCoordinator, never()).dropLease(any()); + } + /** * Test method for {@link ShutdownTask#getTaskType()}. */ @Test public final void testGetTaskType() { - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, shardSyncer, shardSyncStrategy); + KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); + ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, getRecordsCache, shardSyncer, shardSyncStrategy); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } + + /* + * Helper method to construct a shard list for graph A. Graph A is defined below. + * Shard structure (y-axis is epochs): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + */ + private List constructShardListForGraphA() { + List shards = new ArrayList(); + + SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102"); + SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null); + SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("11", "210"); + SequenceNumberRange range3 = ShardObjectHelper.newSequenceNumberRange("103", "210"); + SequenceNumberRange range4 = ShardObjectHelper.newSequenceNumberRange("211", null); + + HashKeyRange hashRange0 = ShardObjectHelper.newHashKeyRange("0", "99"); + HashKeyRange hashRange1 = ShardObjectHelper.newHashKeyRange("100", "199"); + HashKeyRange hashRange2 = ShardObjectHelper.newHashKeyRange("200", "299"); + HashKeyRange hashRange3 = ShardObjectHelper.newHashKeyRange("300", "399"); + HashKeyRange hashRange4 = ShardObjectHelper.newHashKeyRange("400", "499"); + HashKeyRange hashRange5 = ShardObjectHelper.newHashKeyRange("500", ShardObjectHelper.MAX_HASH_KEY); + HashKeyRange hashRange6 = ShardObjectHelper.newHashKeyRange("0", "199"); + HashKeyRange hashRange7 = ShardObjectHelper.newHashKeyRange("200", "399"); + HashKeyRange hashRange8 = ShardObjectHelper.newHashKeyRange("0", "399"); + HashKeyRange hashRange9 = ShardObjectHelper.newHashKeyRange("500", "799"); + HashKeyRange hashRange10 = ShardObjectHelper.newHashKeyRange("800", ShardObjectHelper.MAX_HASH_KEY); + + shards.add(ShardObjectHelper.newShard("shardId-0", null, null, range0, hashRange0)); + shards.add(ShardObjectHelper.newShard("shardId-1", null, null, range0, hashRange1)); + shards.add(ShardObjectHelper.newShard("shardId-2", null, null, range0, hashRange2)); + shards.add(ShardObjectHelper.newShard("shardId-3", null, null, range0, hashRange3)); + shards.add(ShardObjectHelper.newShard("shardId-4", null, null, range1, hashRange4)); + shards.add(ShardObjectHelper.newShard("shardId-5", null, null, range2, hashRange5)); + + shards.add(ShardObjectHelper.newShard("shardId-6", "shardId-0", "shardId-1", range3, hashRange6)); + shards.add(ShardObjectHelper.newShard("shardId-7", "shardId-2", "shardId-3", range3, hashRange7)); + + shards.add(ShardObjectHelper.newShard("shardId-8", "shardId-6", "shardId-7", range4, hashRange8)); + shards.add(ShardObjectHelper.newShard("shardId-9", "shardId-5", null, range4, hashRange9)); + shards.add(ShardObjectHelper.newShard("shardId-10", null, "shardId-5", range4, hashRange10)); + + return shards; + } + } \ No newline at end of file