diff --git a/CHANGELOG.md b/CHANGELOG.md
index e1d39d7e..1853a9a2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,17 @@
# Changelog
+## Release 1.12.0 (October 17, 2019)
+[Milestone#41](https://github.com/awslabs/amazon-kinesis-client/milestone/41)
+* Adding logging around shard end codepaths.
+ * [PR #585](https://github.com/awslabs/amazon-kinesis-client/pull/585)
+* Updating checkpointing failure message to refer to javadocs.
+ * [PR #590](https://github.com/awslabs/amazon-kinesis-client/pull/590)
+* Updating Sonatype to dedicated AWS endpoint.
+ * [PR #618](https://github.com/awslabs/amazon-kinesis-client/pull/618)
+* Introducing a validation step to verify if ShardEnd is reached, to prevent shard consumer stuck scenarios in the event of malformed response from service.
+ * [PR #623](https://github.com/awslabs/amazon-kinesis-client/pull/623)
+* Updating AWS SDK to 1.11.655
+ * [PR #626](https://github.com/awslabs/amazon-kinesis-client/pull/626)
+
## Release 1.11.2 (August 15, 2019)
[Milestone#35](https://github.com/awslabs/amazon-kinesis-client/milestone/35)
* Added support for metrics emission in `PeriodicShardSyncer`.
diff --git a/README.md b/README.md
index e43d02bb..11ef45fd 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,20 @@ To make it easier for developers to write record processors in other languages,
## Release Notes
-## Latest Release (1.11.2 - August 15, 2019)
+## Latest Release (1.12.0 October 17, 2019)
+[Milestone#41](https://github.com/awslabs/amazon-kinesis-client/milestone/41)
+* Adding logging around shard end codepaths
+ * [PR #585](https://github.com/awslabs/amazon-kinesis-client/pull/585)
+* Updating checkpointing failure message to refer to javadocs
+ * [PR #590](https://github.com/awslabs/amazon-kinesis-client/pull/590)
+* Updating Sonatype to dedicated AWS endpoint.
+ * [PR #618](https://github.com/awslabs/amazon-kinesis-client/pull/618)
+* Introducing a validation step to verify if ShardEnd is reached, to prevent shard consumer stuck scenarios in the event of malformed response from service.
+ * [PR #623](https://github.com/awslabs/amazon-kinesis-client/pull/623)
+* Updating AWS SDK to 1.11.655
+ * [PR #626](https://github.com/awslabs/amazon-kinesis-client/pull/626)
+
+## Release 1.11.2 (August 15, 2019)
[Milestone#35](https://github.com/awslabs/amazon-kinesis-client/milestone/35)
* Added support for metrics emission in `PeriodicShardSyncer`.
* [PR #592](https://github.com/awslabs/amazon-kinesis-client/pull/592)
diff --git a/pom.xml b/pom.xml
index d4b4cda7..37d74f66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
amazon-kinesis-client
jar
Amazon Kinesis Client Library for Java
- 1.11.3-SNAPSHOT
+ 1.13.0-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
@@ -25,7 +25,7 @@
- 1.11.603
+ 1.11.655
1.0.392
libsqlite4java
${project.build.directory}/test-lib
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java
index 3ca940ee..47b1239f 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java
@@ -528,7 +528,7 @@ class ConsumerStates {
consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(),
consumer.isIgnoreUnexpectedChildShards(),
- consumer.getLeaseManager(),
+ consumer.getLeaseCoordinator(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy());
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
index 5effe6ee..f7a7fa36 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
@@ -125,7 +125,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
- public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.11.3-SNAPSHOT";
+ public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.13.0-SNAPSHOT";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java
index 5d8d0f82..c23fd678 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang3.StringUtils;
@@ -79,14 +80,38 @@ class KinesisShardSyncer implements ShardSyncer {
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
- public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
- ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream,
- boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
+ @Override
+ public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
+ InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
+ boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}
+ /**
+ * Check and create leases for any new shards (e.g. following a reshard operation).
+ *
+ * @param kinesisProxy
+ * @param leaseManager
+ * @param initialPositionInStream
+ * @param cleanupLeasesOfCompletedShards
+ * @param ignoreUnexpectedChildShards
+ * @param latestShards latest snapshot of shards to reuse
+ * @throws DependencyException
+ * @throws InvalidStateException
+ * @throws ProvisionedThroughputException
+ * @throws KinesisClientLibIOException
+ */
+ @Override
+ public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
+ ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream,
+ boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException,
+ KinesisClientLibIOException {
+ syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards);
+ }
+
/**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
*
@@ -100,14 +125,42 @@ class KinesisShardSyncer implements ShardSyncer {
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
- // CHECKSTYLE:OFF CyclomaticComplexity
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
- List shards = getShardList(kinesisProxy);
- LOG.debug("Num shards: " + shards.size());
+ List latestShards = getShardList(kinesisProxy);
+ syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards);
+ }
+
+ /**
+ * Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
+ *
+ * @param kinesisProxy
+ * @param leaseManager
+ * @param initialPosition
+ * @param cleanupLeasesOfCompletedShards
+ * @param ignoreUnexpectedChildShards
+ * @param latestShards latest snapshot of shards to reuse
+ * @throws DependencyException
+ * @throws InvalidStateException
+ * @throws ProvisionedThroughputException
+ * @throws KinesisClientLibIOException
+ */
+ // CHECKSTYLE:OFF CyclomaticComplexity
+ private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
+ ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition,
+ boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException,
+ KinesisClientLibIOException {
+ List shards;
+ if(CollectionUtils.isNullOrEmpty(latestShards)) {
+ shards = getShardList(kinesisProxy);
+ } else {
+ shards = latestShards;
+ }
+ LOG.debug("Num Shards: " + shards.size());
Map shardIdToShardMap = constructShardIdToShardMap(shards);
Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java
index c39803ae..c85fbbef 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java
@@ -1,5 +1,6 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
/**
* An implementation of ShardSyncStrategy.
*/
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
index caf939b2..a30412ce 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
@@ -50,7 +50,7 @@ class ShardConsumer {
private final ShardInfo shardInfo;
private final KinesisDataFetcher dataFetcher;
private final IMetricsFactory metricsFactory;
- private final ILeaseManager leaseManager;
+ private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private ICheckpoint checkpoint;
// Backoff time when polling to check if application has finished processing parent shards
private final long parentShardPollIntervalMillis;
@@ -98,7 +98,7 @@ class ShardConsumer {
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param config Kinesis library configuration
- * @param leaseManager Used to create leases for new shards
+ * @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
@@ -110,7 +110,7 @@ class ShardConsumer {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
- ILeaseManager leaseManager,
+ KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
@@ -122,7 +122,7 @@ class ShardConsumer {
streamConfig,
checkpoint,
recordProcessor,
- leaseManager,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -139,7 +139,7 @@ class ShardConsumer {
* @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
- * @param leaseManager Used to create leases for new shards
+ * @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
@@ -154,7 +154,7 @@ class ShardConsumer {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
- ILeaseManager leaseManager,
+ KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
@@ -177,7 +177,7 @@ class ShardConsumer {
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
metricsFactory),
- leaseManager,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -197,7 +197,7 @@ class ShardConsumer {
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress
- * @param leaseManager Used to create leases for new shards
+ * @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param cleanupLeasesOfCompletedShards clean up the leases of completed shards
* @param executorService ExecutorService used to execute process tasks for this shard
@@ -215,7 +215,7 @@ class ShardConsumer {
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
- ILeaseManager leaseManager,
+ KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
@@ -231,7 +231,7 @@ class ShardConsumer {
this.checkpoint = checkpoint;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
- this.leaseManager = leaseManager;
+ this.leaseCoordinator = leaseCoordinator;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.executorService = executorService;
@@ -480,7 +480,11 @@ class ShardConsumer {
}
ILeaseManager getLeaseManager() {
- return leaseManager;
+ return leaseCoordinator.getLeaseManager();
+ }
+
+ KinesisClientLibLeaseCoordinator getLeaseCoordinator() {
+ return leaseCoordinator;
}
ICheckpoint getCheckpoint() {
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java
index dc620aec..8077efcc 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java
@@ -1,8 +1,10 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,6 +57,12 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy {
return onFoundCompletedShard();
}
+ @Override
+ public TaskResult onShardConsumerShutDown(List latestShards) {
+ shardSyncTaskManager.syncShardAndLeaseInfo(latestShards);
+ return new TaskResult(null);
+ }
+
@Override
public void onWorkerShutDown() {
LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString()));
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java
index 6738d2e9..0303f188 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java
@@ -1,5 +1,9 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import com.amazonaws.services.kinesis.model.Shard;
+
+import java.util.List;
+
/**
* Facade of methods that can be invoked at different points
* in KCL application execution to perform certain actions related to shard-sync.
@@ -41,6 +45,16 @@ public interface ShardSyncStrategy {
*/
TaskResult onShardConsumerShutDown();
+ /**
+ * Invoked when ShardConsumer is shutdown and all shards are provided.
+ *
+ * @param latestShards latest snapshot of shards to reuse
+ * @return
+ */
+ default TaskResult onShardConsumerShutDown(List latestShards) {
+ return onShardConsumerShutDown();
+ }
+
/**
* Invoked when worker is shutdown.
*/
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
index f6e2a87d..13c43b0e 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
@@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -21,6 +22,8 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
+import java.util.List;
+
/**
* This task syncs leases/activies with shards of the stream.
* It will create new leases/activites when it discovers new shards (e.g. setup/resharding).
@@ -39,6 +42,7 @@ class ShardSyncTask implements ITask {
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;
private final ShardSyncer shardSyncer;
+ private final List latestShards;
/**
* @param kinesisProxy Used to fetch information about the stream (e.g. shard list)
@@ -50,6 +54,7 @@ class ShardSyncTask implements ITask {
* in Kinesis)
* @param shardSyncTaskIdleTimeMillis shardSync task idle time in millis
* @param shardSyncer shardSyncer instance used to check and create new leases
+ * @param latestShards latest snapshot of shards to reuse
*/
ShardSyncTask(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
@@ -57,7 +62,8 @@ class ShardSyncTask implements ITask {
boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards,
long shardSyncTaskIdleTimeMillis,
- ShardSyncer shardSyncer) {
+ ShardSyncer shardSyncer, List latestShards) {
+ this.latestShards = latestShards;
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.initialPosition = initialPositionInStream;
@@ -79,7 +85,8 @@ class ShardSyncTask implements ITask {
leaseManager,
initialPosition,
cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards);
+ ignoreUnexpectedChildShards,
+ latestShards);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
index 9601cf64..0fabbbcd 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
@@ -14,11 +14,13 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import com.amazonaws.services.kinesis.model.Shard;
import lombok.Getter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -85,11 +87,11 @@ class ShardSyncTaskManager {
this.shardSyncer = shardSyncer;
}
- synchronized Future syncShardAndLeaseInfo(Set closedShardIds) {
- return checkAndSubmitNextTask(closedShardIds);
+ synchronized Future syncShardAndLeaseInfo(List latestShards) {
+ return checkAndSubmitNextTask(latestShards);
}
- private synchronized Future checkAndSubmitNextTask(Set closedShardIds) {
+ private synchronized Future checkAndSubmitNextTask(List latestShards) {
Future submittedTaskFuture = null;
if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) {
@@ -111,7 +113,7 @@ class ShardSyncTaskManager {
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
- shardSyncer), metricsFactory);
+ shardSyncer, latestShards), metricsFactory);
future = executorService.submit(currentTask);
if (LOG.isDebugEnabled()) {
LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
index 2821dd2d..ca8511ce 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
@@ -21,12 +21,23 @@ import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
+import com.amazonaws.services.kinesis.model.Shard;
+
+import java.util.List;
public interface ShardSyncer {
void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
- InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
- boolean ignoreUnexpectedChildShards)
+ InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
+ boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException;
+
+ default void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
+ InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
+ boolean ignoreUnexpectedChildShards, List latestShards)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException,
+ KinesisClientLibIOException {
+ checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
+ }
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
index f062d745..abbc7bb1 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
@@ -14,6 +14,9 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.util.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -22,11 +25,14 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
-import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
/**
* Task for invoking the RecordProcessor shutdown() callback.
*/
@@ -41,7 +47,7 @@ class ShutdownTask implements ITask {
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownReason reason;
private final IKinesisProxy kinesisProxy;
- private final ILeaseManager leaseManager;
+ private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards;
private final boolean ignoreUnexpectedChildShards;
@@ -63,7 +69,7 @@ class ShutdownTask implements ITask {
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
- ILeaseManager leaseManager,
+ KinesisClientLibLeaseCoordinator leaseCoordinator,
long backoffTimeMillis,
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this.shardInfo = shardInfo;
@@ -74,7 +80,7 @@ class ShutdownTask implements ITask {
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
- this.leaseManager = leaseManager;
+ this.leaseCoordinator = leaseCoordinator;
this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsCache = getRecordsCache;
this.shardSyncer = shardSyncer;
@@ -93,24 +99,44 @@ class ShutdownTask implements ITask {
boolean applicationException = false;
try {
+ ShutdownReason localReason = reason;
+ List latestShards = null;
+ /*
+ * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
+ * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active
+ * workers to contend for the lease of this shard.
+ */
+ if(localReason == ShutdownReason.TERMINATE) {
+ latestShards = kinesisProxy.getShardList();
+
+ //If latestShards is null or empty, we should still shut down the ShardConsumer with Zombie state which avoid
+ // checking point with SHARD_END sequence number.
+ if(CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) {
+ localReason = ShutdownReason.ZOMBIE;
+ dropLease();
+ LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId());
+ }
+ }
+
+
// If we reached end of the shard, set sequence number to SHARD_END.
- if (reason == ShutdownReason.TERMINATE) {
+ if (localReason == ShutdownReason.TERMINATE) {
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
}
LOG.debug("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken "
- + shardInfo.getConcurrencyToken() + ". Shutdown reason: " + reason);
+ + shardInfo.getConcurrencyToken() + ". Shutdown reason: " + localReason);
final ShutdownInput shutdownInput = new ShutdownInput()
- .withShutdownReason(reason)
+ .withShutdownReason(localReason)
.withCheckpointer(recordProcessorCheckpointer);
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
recordProcessor.shutdown(shutdownInput);
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
- if (reason == ShutdownReason.TERMINATE) {
+ if (localReason == ShutdownReason.TERMINATE) {
if ((lastCheckpointValue == null)
|| (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
@@ -129,10 +155,10 @@ class ShutdownTask implements ITask {
MetricsLevel.SUMMARY);
}
- if (reason == ShutdownReason.TERMINATE) {
+ if (localReason == ShutdownReason.TERMINATE) {
LOG.debug("Looking for child shards of shard " + shardInfo.getShardId());
// create leases for the child shards
- TaskResult result = shardSyncStrategy.onShardConsumerShutDown();
+ TaskResult result = shardSyncStrategy.onShardConsumerShutDown(latestShards);
if (result.getException() != null) {
LOG.debug("Exception while trying to sync shards on the shutdown of shard: " + shardInfo
.getShardId());
@@ -175,4 +201,23 @@ class ShutdownTask implements ITask {
return reason;
}
+ private boolean isShardInContextParentOfAny(List shards) {
+ for(Shard shard : shards) {
+ if (isChildShardOfShardInContext(shard)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isChildShardOfShardInContext(Shard shard) {
+ return (StringUtils.equals(shard.getParentShardId(), shardInfo.getShardId())
+ || StringUtils.equals(shard.getAdjacentParentShardId(), shardInfo.getShardId()));
+ }
+
+ private void dropLease() {
+ KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
+ leaseCoordinator.dropLease(lease);
+ LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey());
+ }
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index b231da73..185113fc 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -679,7 +679,7 @@ public class Worker implements Runnable {
LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
- config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer);
+ config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer, null);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
@@ -1045,7 +1045,7 @@ public class Worker implements Runnable {
streamConfig,
checkpointTracker,
recordProcessor,
- leaseCoordinator.getLeaseManager(),
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesUponShardCompletion,
executorService,
@@ -1170,7 +1170,7 @@ public class Worker implements Runnable {
new ShardSyncTask(kinesisProxy, leaseManager, config.getInitialPositionInStreamExtended(),
config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(), SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
- shardSyncer), metricsFactory));
+ shardSyncer, null), metricsFactory));
}
private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) {
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java
index 8c85b546..d9160f0f 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java
@@ -35,6 +35,7 @@ import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@@ -66,6 +67,8 @@ public class ConsumerStatesTest {
private KinesisDataFetcher dataFetcher;
@Mock
private ILeaseManager leaseManager;
+ @InjectMocks
+ private KinesisClientLibLeaseCoordinator leaseCoordinator = new KinesisClientLibLeaseCoordinator(leaseManager, "testCoordinator", 1000, 1000);
@Mock
private ICheckpoint checkpoint;
@Mock
@@ -93,6 +96,7 @@ public class ConsumerStatesTest {
when(consumer.getShardInfo()).thenReturn(shardInfo);
when(consumer.getDataFetcher()).thenReturn(dataFetcher);
when(consumer.getLeaseManager()).thenReturn(leaseManager);
+ when(consumer.getLeaseCoordinator()).thenReturn(leaseCoordinator);
when(consumer.getCheckpoint()).thenReturn(checkpoint);
when(consumer.getFuture()).thenReturn(future);
when(consumer.getShutdownNotification()).thenReturn(shutdownNotification);
@@ -294,7 +298,7 @@ public class ConsumerStatesTest {
equalTo(recordProcessorCheckpointer)));
assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
assertThat(task, shutdownTask(IKinesisProxy.class, "kinesisProxy", equalTo(kinesisProxy)));
- assertThat(task, shutdownTask(LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager)));
+ assertThat(task, shutdownTask(KinesisClientLibLeaseCoordinator.class, "leaseCoordinator", equalTo(leaseCoordinator)));
assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream",
equalTo(initialPositionInStream)));
assertThat(task,
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
index d0cf98db..90512d01 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
@@ -61,6 +61,7 @@ import org.hamcrest.TypeSafeMatcher;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@@ -120,6 +121,8 @@ public class ShardConsumerTest {
@Mock
private ILeaseManager leaseManager;
@Mock
+ private KinesisClientLibLeaseCoordinator leaseCoordinator;
+ @Mock
private ICheckpoint checkpoint;
@Mock
private ShutdownNotification shutdownNotification;
@@ -148,6 +151,7 @@ public class ShardConsumerTest {
when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class);
when(leaseManager.getLease(anyString())).thenReturn(null);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
@@ -160,7 +164,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
- null,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -197,6 +201,7 @@ public class ShardConsumerTest {
when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class);
when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class);
when(leaseManager.getLease(anyString())).thenReturn(null);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
@@ -209,7 +214,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
- null,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
spyExecutorService,
@@ -253,7 +258,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
- null,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -267,6 +272,7 @@ public class ShardConsumerTest {
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
when(leaseManager.getLease(anyString())).thenReturn(null);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
@@ -335,6 +341,7 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig =
@@ -371,7 +378,7 @@ public class ShardConsumerTest {
checkpoint,
processor,
recordProcessorCheckpointer,
- leaseManager,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -511,7 +518,7 @@ public class ShardConsumerTest {
@Override
public void shutdown(ShutdownInput input) {
ShutdownReason reason = input.getShutdownReason();
- if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) {
+ if ((reason.equals(ShutdownReason.ZOMBIE) || reason.equals(ShutdownReason.TERMINATE)) && errorShutdownLatch.getCount() > 0) {
errorShutdownLatch.countDown();
throw new RuntimeException("test");
} else {
@@ -522,7 +529,7 @@ public class ShardConsumerTest {
/**
* Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record
- * processor's shutdown method with reason terminate will be retried.
+ * processor's shutdown method with reason zombie will be retried.
*/
@Test
public final void testConsumeShardWithTransientTerminateError() throws Exception {
@@ -542,7 +549,7 @@ public class ShardConsumerTest {
final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
- when(leaseManager.getLease(anyString())).thenReturn(null);
+
TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
@@ -562,6 +569,9 @@ public class ShardConsumerTest {
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
any(IMetricsFactory.class), anyInt()))
.thenReturn(getRecordsCache);
+ when(leaseManager.getLease(anyString())).thenReturn(null);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
+ when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease());
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
@@ -580,7 +590,7 @@ public class ShardConsumerTest {
checkpoint,
processor,
recordProcessorCheckpointer,
- leaseManager,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -594,7 +604,150 @@ public class ShardConsumerTest {
shardSyncer,
shardSyncStrategy);
- when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(null));
+ when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null));
+
+ assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
+ consumer.consumeShard(); // check on parent shards
+ Thread.sleep(50L);
+ consumer.consumeShard(); // start initialization
+ assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
+ consumer.consumeShard(); // initialize
+ processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
+ verify(getRecordsCache).start();
+
+ // We expect to process all records in numRecs calls
+ for (int i = 0; i < numRecs;) {
+ boolean newTaskSubmitted = consumer.consumeShard();
+ if (newTaskSubmitted) {
+ LOG.debug("New processing task was submitted, call # " + i);
+ assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
+ // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
+ i += maxRecords;
+ }
+ Thread.sleep(50L);
+ }
+
+ // Consume shards until shutdown terminate is called and it has thrown an exception
+ for (int i = 0; i < 100; i++) {
+ consumer.consumeShard();
+ if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) {
+ break;
+ }
+ }
+ assertEquals(0, processor.errorShutdownLatch.getCount());
+
+ // Wait for a retry of shutdown terminate that should succeed
+ for (int i = 0; i < 100; i++) {
+ consumer.consumeShard();
+ if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) {
+ break;
+ }
+ }
+ assertEquals(0, processor.getShutdownLatch().getCount());
+
+ // Wait for shutdown complete now that terminate shutdown is successful
+ for (int i = 0; i < 100; i++) {
+ consumer.consumeShard();
+ if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) {
+ break;
+ }
+ Thread.sleep(50L);
+ }
+ assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
+
+ assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
+
+ verify(getRecordsCache).shutdown();
+
+ executorService.shutdown();
+ executorService.awaitTermination(60, TimeUnit.SECONDS);
+
+ String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString());
+ List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
+ verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
+ file.delete();
+ }
+
+
+
+ /**
+ * Test method for {@link ShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown
+ * reason TERMINATE when the shard end is reached.
+ */
+ @Test
+ public final void testConsumeShardWithShardEnd() throws Exception {
+ int numRecs = 10;
+ BigInteger startSeqNum = BigInteger.ONE;
+ String streamShardId = "kinesis-0-0";
+ String testConcurrencyToken = "testToken";
+ List shardList = KinesisLocalFileDataCreator.createShardList(3, "kinesis-0-", startSeqNum);
+ // Close the shard so that shutdown is called with reason terminate
+ shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber(
+ KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString());
+ shardList.get(1).setParentShardId("kinesis-0-0");
+ shardList.get(2).setAdjacentParentShardId("kinesis-0-0");
+ File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002");
+
+ IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
+
+ final int maxRecords = 2;
+ final int idleTimeMS = 0; // keep unit tests fast
+ ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
+ checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
+ when(leaseManager.getLease(anyString())).thenReturn(null);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
+
+ TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
+
+ StreamConfig streamConfig =
+ new StreamConfig(fileBasedProxy,
+ maxRecords,
+ idleTimeMS,
+ callProcessRecordsForEmptyRecordList,
+ skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
+
+ ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null);
+
+ dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
+
+ getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
+ new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
+ when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
+ any(IMetricsFactory.class), anyInt()))
+ .thenReturn(getRecordsCache);
+
+ RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
+ shardInfo,
+ checkpoint,
+ new SequenceNumberValidator(
+ streamConfig.getStreamProxy(),
+ shardInfo.getShardId(),
+ streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
+ ),
+ metricsFactory
+ );
+
+ ShardConsumer consumer =
+ new ShardConsumer(shardInfo,
+ streamConfig,
+ checkpoint,
+ processor,
+ recordProcessorCheckpointer,
+ leaseCoordinator,
+ parentShardPollIntervalMillis,
+ cleanupLeasesOfCompletedShards,
+ executorService,
+ metricsFactory,
+ taskBackoffTimeMillis,
+ KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
+ dataFetcher,
+ Optional.empty(),
+ Optional.empty(),
+ config,
+ shardSyncer,
+ shardSyncStrategy);
+
+ when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@@ -684,6 +837,7 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig =
@@ -721,7 +875,7 @@ public class ShardConsumerTest {
checkpoint,
processor,
recordProcessorCheckpointer,
- leaseManager,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -796,7 +950,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
- null,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -812,6 +966,7 @@ public class ShardConsumerTest {
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
when(leaseManager.getLease(anyString())).thenReturn(null);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory());
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
@@ -849,7 +1004,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
- null,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -881,7 +1036,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
- null,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@@ -924,7 +1079,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
- null,
+ leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
mockExecutorService,
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
index 26a47079..04741f45 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
@@ -127,7 +127,8 @@ public class ShardSyncTaskIntegrationTest {
false,
false,
0L,
- shardSyncer);
+ shardSyncer,
+ null);
syncTask.call();
List leases = leaseManager.listLeases();
Set leaseKeys = new HashSet();
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java
index 9c156f58..fd34be76 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java
@@ -25,6 +25,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,7 +37,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ExceptionThrowingLeaseManager.ExceptionThrowingLeaseManagerMethods;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
@@ -230,7 +232,7 @@ public class ShardSyncerTest {
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
- cleanupLeasesOfCompletedShards, false);
+ cleanupLeasesOfCompletedShards, false, shards);
List newLeases = leaseManager.listLeases();
Set expectedLeaseShardIds = new HashSet();
expectedLeaseShardIds.add("shardId-4");
@@ -262,7 +264,7 @@ public class ShardSyncerTest {
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
- cleanupLeasesOfCompletedShards, false);
+ cleanupLeasesOfCompletedShards, false, shards);
List newLeases = leaseManager.listLeases();
Set expectedLeaseShardIds = new HashSet();
for (int i = 0; i < 11; i++) {
@@ -293,7 +295,7 @@ public class ShardSyncerTest {
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP,
- cleanupLeasesOfCompletedShards, false);
+ cleanupLeasesOfCompletedShards, false, shards);
List newLeases = leaseManager.listLeases();
Set expectedLeaseShardIds = new HashSet();
for (int i = 0; i < 11; i++) {
@@ -327,7 +329,7 @@ public class ShardSyncerTest {
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
- cleanupLeasesOfCompletedShards, false);
+ cleanupLeasesOfCompletedShards, false, shards);
dataFile.delete();
}
@@ -352,7 +354,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
- cleanupLeasesOfCompletedShards, true);
+ cleanupLeasesOfCompletedShards, true, shards);
List newLeases = leaseManager.listLeases();
Set expectedLeaseShardIds = new HashSet();
expectedLeaseShardIds.add("shardId-4");
@@ -467,7 +469,7 @@ public class ShardSyncerTest {
exceptionThrowingLeaseManager,
position,
cleanupLeasesOfCompletedShards,
- false);
+ false, null);
return;
} catch (LeasingException e) {
LOG.debug("Catch leasing exception", e);
@@ -480,7 +482,7 @@ public class ShardSyncerTest {
leaseManager,
position,
cleanupLeasesOfCompletedShards,
- false);
+ false, null);
}
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java
index 029a1efe..cd82e475 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java
@@ -14,16 +14,22 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -53,7 +59,7 @@ public class ShutdownTaskTest {
Set defaultParentShardIds = new HashSet<>();
String defaultConcurrencyToken = "testToken4398";
- String defaultShardId = "shardId-0000397840";
+ String defaultShardId = "shardId-0";
ShardInfo defaultShardInfo = new ShardInfo(defaultShardId,
defaultConcurrencyToken,
defaultParentShardIds,
@@ -104,7 +110,11 @@ public class ShutdownTaskTest {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
+ List shards = constructShardListForGraphA();
+ when(kinesisProxy.getShardList()).thenReturn(shards);
+ KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
ShutdownTask task = new ShutdownTask(defaultShardInfo,
@@ -115,7 +125,7 @@ public class ShutdownTaskTest {
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
- leaseManager,
+ leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
@@ -132,12 +142,17 @@ public class ShutdownTaskTest {
public final void testCallWhenSyncingShardsThrows() {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
+ List shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
- when(kinesisProxy.getShardList()).thenReturn(null);
+ when(kinesisProxy.getShardList()).thenReturn(shards);
+ KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
- when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(new KinesisClientLibIOException("")));
+
+ when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(new KinesisClientLibIOException("")));
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
@@ -146,25 +161,187 @@ public class ShutdownTaskTest {
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
- leaseManager,
+ leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
TaskResult result = task.call();
- verify(shardSyncStrategy).onShardConsumerShutDown();
+ verify(shardSyncStrategy).onShardConsumerShutDown(shards);
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(getRecordsCache).shutdown();
}
+ @Test
+ public final void testCallWhenShardEnd() {
+ RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
+ when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
+ List shards = constructShardListForGraphA();
+ IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
+ when(kinesisProxy.getShardList()).thenReturn(shards);
+ KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
+ ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
+ boolean cleanupLeasesOfCompletedShards = false;
+ boolean ignoreUnexpectedChildShards = false;
+
+ when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
+ ShutdownTask task = new ShutdownTask(defaultShardInfo,
+ defaultRecordProcessor,
+ checkpointer,
+ ShutdownReason.TERMINATE,
+ kinesisProxy,
+ INITIAL_POSITION_TRIM_HORIZON,
+ cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
+ leaseCoordinator,
+ TASK_BACKOFF_TIME_MILLIS,
+ getRecordsCache,
+ shardSyncer,
+ shardSyncStrategy);
+ TaskResult result = task.call();
+ verify(shardSyncStrategy).onShardConsumerShutDown(shards);
+ verify(kinesisProxy, times(1)).getShardList();
+ Assert.assertNull(result.getException());
+ verify(getRecordsCache).shutdown();
+ verify(leaseCoordinator, never()).dropLease(any());
+ }
+
+ @Test
+ public final void testCallWhenFalseShardEnd() {
+ ShardInfo shardInfo = new ShardInfo("shardId-4",
+ defaultConcurrencyToken,
+ defaultParentShardIds,
+ ExtendedSequenceNumber.LATEST);
+ RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
+ when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
+ List shards = constructShardListForGraphA();
+ IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
+ when(kinesisProxy.getShardList()).thenReturn(shards);
+ KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
+ ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
+ when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease());
+ boolean cleanupLeasesOfCompletedShards = false;
+ boolean ignoreUnexpectedChildShards = false;
+
+ when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
+
+ ShutdownTask task = new ShutdownTask(shardInfo,
+ defaultRecordProcessor,
+ checkpointer,
+ ShutdownReason.TERMINATE,
+ kinesisProxy,
+ INITIAL_POSITION_TRIM_HORIZON,
+ cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
+ leaseCoordinator,
+ TASK_BACKOFF_TIME_MILLIS,
+ getRecordsCache,
+ shardSyncer,
+ shardSyncStrategy);
+ TaskResult result = task.call();
+ verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
+ verify(kinesisProxy, times(1)).getShardList();
+ Assert.assertNull(result.getException());
+ verify(getRecordsCache).shutdown();
+ verify(leaseCoordinator).dropLease(any());
+ }
+
+ @Test
+ public final void testCallWhenLeaseLost() {
+ RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
+ when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
+ List shards = constructShardListForGraphA();
+ IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
+ when(kinesisProxy.getShardList()).thenReturn(shards);
+ KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
+ ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
+ when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
+ boolean cleanupLeasesOfCompletedShards = false;
+ boolean ignoreUnexpectedChildShards = false;
+
+ when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
+ ShutdownTask task = new ShutdownTask(defaultShardInfo,
+ defaultRecordProcessor,
+ checkpointer,
+ ShutdownReason.ZOMBIE,
+ kinesisProxy,
+ INITIAL_POSITION_TRIM_HORIZON,
+ cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
+ leaseCoordinator,
+ TASK_BACKOFF_TIME_MILLIS,
+ getRecordsCache,
+ shardSyncer,
+ shardSyncStrategy);
+ TaskResult result = task.call();
+ verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
+ verify(kinesisProxy, never()).getShardList();
+ Assert.assertNull(result.getException());
+ verify(getRecordsCache).shutdown();
+ verify(leaseCoordinator, never()).dropLease(any());
+ }
+
/**
* Test method for {@link ShutdownTask#getTaskType()}.
*/
@Test
public final void testGetTaskType() {
- ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, shardSyncer, shardSyncStrategy);
+ KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
+ ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, getRecordsCache, shardSyncer, shardSyncStrategy);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
}
+
+ /*
+ * Helper method to construct a shard list for graph A. Graph A is defined below.
+ * Shard structure (y-axis is epochs):
+ * 0 1 2 3 4 5- shards till epoch 102
+ * \ / \ / | |
+ * 6 7 4 5- shards from epoch 103 - 205
+ * \ / | /\
+ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
+ */
+ private List constructShardListForGraphA() {
+ List shards = new ArrayList();
+
+ SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102");
+ SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null);
+ SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("11", "210");
+ SequenceNumberRange range3 = ShardObjectHelper.newSequenceNumberRange("103", "210");
+ SequenceNumberRange range4 = ShardObjectHelper.newSequenceNumberRange("211", null);
+
+ HashKeyRange hashRange0 = ShardObjectHelper.newHashKeyRange("0", "99");
+ HashKeyRange hashRange1 = ShardObjectHelper.newHashKeyRange("100", "199");
+ HashKeyRange hashRange2 = ShardObjectHelper.newHashKeyRange("200", "299");
+ HashKeyRange hashRange3 = ShardObjectHelper.newHashKeyRange("300", "399");
+ HashKeyRange hashRange4 = ShardObjectHelper.newHashKeyRange("400", "499");
+ HashKeyRange hashRange5 = ShardObjectHelper.newHashKeyRange("500", ShardObjectHelper.MAX_HASH_KEY);
+ HashKeyRange hashRange6 = ShardObjectHelper.newHashKeyRange("0", "199");
+ HashKeyRange hashRange7 = ShardObjectHelper.newHashKeyRange("200", "399");
+ HashKeyRange hashRange8 = ShardObjectHelper.newHashKeyRange("0", "399");
+ HashKeyRange hashRange9 = ShardObjectHelper.newHashKeyRange("500", "799");
+ HashKeyRange hashRange10 = ShardObjectHelper.newHashKeyRange("800", ShardObjectHelper.MAX_HASH_KEY);
+
+ shards.add(ShardObjectHelper.newShard("shardId-0", null, null, range0, hashRange0));
+ shards.add(ShardObjectHelper.newShard("shardId-1", null, null, range0, hashRange1));
+ shards.add(ShardObjectHelper.newShard("shardId-2", null, null, range0, hashRange2));
+ shards.add(ShardObjectHelper.newShard("shardId-3", null, null, range0, hashRange3));
+ shards.add(ShardObjectHelper.newShard("shardId-4", null, null, range1, hashRange4));
+ shards.add(ShardObjectHelper.newShard("shardId-5", null, null, range2, hashRange5));
+
+ shards.add(ShardObjectHelper.newShard("shardId-6", "shardId-0", "shardId-1", range3, hashRange6));
+ shards.add(ShardObjectHelper.newShard("shardId-7", "shardId-2", "shardId-3", range3, hashRange7));
+
+ shards.add(ShardObjectHelper.newShard("shardId-8", "shardId-6", "shardId-7", range4, hashRange8));
+ shards.add(ShardObjectHelper.newShard("shardId-9", "shardId-5", null, range4, hashRange9));
+ shards.add(ShardObjectHelper.newShard("shardId-10", null, "shardId-5", range4, hashRange10));
+
+ return shards;
+ }
+
}
\ No newline at end of file