diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index d3ccb911..f96d622b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -21,7 +21,7 @@ import java.util.Optional; * and state transitions is contained within the {@link ConsumerState} objects. * *
* +-------------------+
* | Waiting on Parent | +------------------+
@@ -96,14 +96,14 @@ class ConsumerStates {
/**
* Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to
* do when a transition occurs.
- *
+ *
*/
interface ConsumerState {
/**
* Creates a new task for this state using the passed in consumer to build the task. If there is no task
* required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
* consumer during the execution of this method.
- *
+ *
* @param consumer
* the consumer to use build the task, or execute state.
* @return a valid task for this state or null if there is no task required.
@@ -113,7 +113,7 @@ class ConsumerStates {
/**
* Provides the next state of the consumer upon success of the task return by
* {@link ConsumerState#createTask(ShardConsumer)}.
- *
+ *
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.
*/
@@ -122,7 +122,7 @@ class ConsumerStates {
/**
* Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent
* on the current state, and the shutdown reason.
- *
+ *
* @param shutdownReason
* the reason that a shutdown was requested
* @return the next state that the consumer should transition to, this may be the same object as the current
@@ -133,7 +133,7 @@ class ConsumerStates {
/**
* The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state
* even if createTask would return a null value.
- *
+ *
* @return the type of task that this state represents.
*/
TaskType getTaskType();
@@ -141,7 +141,7 @@ class ConsumerStates {
/**
* An enumeration represent the type of this state. Different consumer states may return the same
* {@link ShardConsumerState}.
- *
+ *
* @return the type of consumer state this represents.
*/
ShardConsumerState getState();
@@ -515,7 +515,9 @@ class ConsumerStates {
consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(),
consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(),
- consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(),
+ consumer.isCleanupLeasesOfCompletedShards(),
+ consumer.isIgnoreUnexpectedChildShards(),
+ consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsRetrievalStrategy());
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
index 7d6dac5a..8779f47a 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
@@ -86,6 +86,11 @@ public class KinesisClientLibConfiguration {
*/
public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true;
+ /**
+ * Ignore child shards with open parents.
+ */
+ public static final boolean DEFAULT_IGNORE_UNEXPECTED_CHILD_SHARDS = false;
+
/**
* Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
*/
@@ -200,6 +205,7 @@ public class KinesisClientLibConfiguration {
private boolean callProcessRecordsEvenForEmptyRecordList;
private long parentShardPollIntervalMillis;
private boolean cleanupLeasesUponShardCompletion;
+ private boolean ignoreUnexpectedChildShards;
private ClientConfiguration kinesisClientConfig;
private ClientConfiguration dynamoDBClientConfig;
private ClientConfiguration cloudWatchClientConfig;
@@ -272,6 +278,7 @@ public class KinesisClientLibConfiguration {
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
+ DEFAULT_IGNORE_UNEXPECTED_CHILD_SHARDS,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null,
@@ -300,6 +307,7 @@ public class KinesisClientLibConfiguration {
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
+ * @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
@@ -329,6 +337,7 @@ public class KinesisClientLibConfiguration {
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
+ boolean ignoreUnexpectedChildShards,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
@@ -342,7 +351,7 @@ public class KinesisClientLibConfiguration {
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis,
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis,
- shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
+ shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, ignoreUnexpectedChildShards,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis);
@@ -371,6 +380,7 @@ public class KinesisClientLibConfiguration {
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
+ * @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
@@ -400,6 +410,7 @@ public class KinesisClientLibConfiguration {
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
+ boolean ignoreUnexpectedChildShards,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
@@ -436,6 +447,7 @@ public class KinesisClientLibConfiguration {
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
+ this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.workerIdentifier = workerId;
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
@@ -670,6 +682,13 @@ public class KinesisClientLibConfiguration {
return cleanupLeasesUponShardCompletion;
}
+ /**
+ * @return true if we should ignore child shards which have open parents
+ */
+ public boolean shouldIgnoreUnexpectedChildShards() {
+ return ignoreUnexpectedChildShards;
+ }
+
/**
* @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before
* checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)}
@@ -890,6 +909,16 @@ public class KinesisClientLibConfiguration {
return this;
}
+ /**
+ * @param ignoreUnexpectedChildShards Ignore child shards with open parents.
+ * @return KinesisClientLibConfiguration
+ */
+ public KinesisClientLibConfiguration withIgnoreUnexpectedChildShards(
+ boolean ignoreUnexpectedChildShards) {
+ this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
+ return this;
+ }
+
/**
* @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client
* @return KinesisClientLibConfiguration
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
index 4bbe1939..246c2aa9 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
@@ -53,6 +53,7 @@ class ShardConsumer {
// Backoff time when polling to check if application has finished processing parent shards
private final long parentShardPollIntervalMillis;
private final boolean cleanupLeasesOfCompletedShards;
+ private final boolean ignoreUnexpectedChildShards;
private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@@ -97,7 +98,7 @@ class ShardConsumer {
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
*/
- // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
+ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 11 LINES
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
@@ -105,12 +106,13 @@ class ShardConsumer {
ILeaseManager leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
+ boolean ignoreUnexpectedChildShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis,
- cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis,
+ cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, executorService, metricsFactory, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
@@ -135,6 +137,7 @@ class ShardConsumer {
ILeaseManager leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
+ boolean ignoreUnexpectedChildShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
@@ -157,6 +160,7 @@ class ShardConsumer {
this.metricsFactory = metricsFactory;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
+ this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo);
@@ -165,7 +169,7 @@ class ShardConsumer {
/**
* No-op if current task is pending, otherwise submits next task for this shard.
* This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state.
- *
+ *
* @return true if a new process task was submitted, false otherwise
*/
synchronized boolean consumeShard() {
@@ -260,7 +264,7 @@ class ShardConsumer {
/**
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
* before being shutdown.
- *
+ *
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/
void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
@@ -271,7 +275,7 @@ class ShardConsumer {
/**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
* This is called by Worker when it loses responsibility for a shard.
- *
+ *
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
synchronized boolean beginShutdown() {
@@ -291,7 +295,7 @@ class ShardConsumer {
/**
* Used (by Worker) to check if this ShardConsumer instance has been shutdown
* RecordProcessor shutdown() has been invoked, as appropriate.
- *
+ *
* @return true if shutdown is complete
*/
boolean isShutdown() {
@@ -307,7 +311,7 @@ class ShardConsumer {
/**
* Figure out next task to run based on current state, task, and shutdown context.
- *
+ *
* @return Return next task to run
*/
private ITask getNextTask() {
@@ -323,7 +327,7 @@ class ShardConsumer {
/**
* Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info.
- *
+ *
* @param taskOutcome The outcome of the last task
*/
void updateState(TaskOutcome taskOutcome) {
@@ -355,7 +359,7 @@ class ShardConsumer {
/**
* Private/Internal method - has package level access solely for testing purposes.
- *
+ *
* @return the currentState
*/
ConsumerStates.ShardConsumerState getCurrentState() {
@@ -402,6 +406,10 @@ class ShardConsumer {
return cleanupLeasesOfCompletedShards;
}
+ boolean isIgnoreUnexpectedChildShards() {
+ return ignoreUnexpectedChildShards;
+ }
+
long getTaskBackoffTimeMillis() {
return taskBackoffTimeMillis;
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
index ddfb8459..5a0c3d5a 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
@@ -35,6 +35,7 @@ class ShardSyncTask implements ITask {
private final ILeaseManager leaseManager;
private InitialPositionInStreamExtended initialPosition;
private final boolean cleanupLeasesUponShardCompletion;
+ private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;
@@ -49,11 +50,13 @@ class ShardSyncTask implements ITask {
ILeaseManager leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesUponShardCompletion,
+ boolean ignoreUnexpectedChildShards,
long shardSyncTaskIdleTimeMillis) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.initialPosition = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
+ this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
}
@@ -68,7 +71,8 @@ class ShardSyncTask implements ITask {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPosition,
- cleanupLeasesUponShardCompletion);
+ cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
index c1bfae76..65b7fad6 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
@@ -44,17 +44,19 @@ class ShardSyncTaskManager {
private final ExecutorService executorService;
private final InitialPositionInStreamExtended initialPositionInStream;
private boolean cleanupLeasesUponShardCompletion;
+ private boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis;
/**
* Constructor.
- *
+ *
* @param kinesisProxy Proxy used to fetch streamInfo (shards)
* @param leaseManager Lease manager (used to list and create leases for shards)
* @param initialPositionInStream Initial position in stream
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
* until they expire)
+ * @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
@@ -63,6 +65,7 @@ class ShardSyncTaskManager {
final ILeaseManager leaseManager,
final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesUponShardCompletion,
+ final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory,
ExecutorService executorService) {
@@ -70,6 +73,7 @@ class ShardSyncTaskManager {
this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
+ this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream;
@@ -99,6 +103,7 @@ class ShardSyncTaskManager {
leaseManager,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis), metricsFactory);
future = executorService.submit(currentTask);
submittedNewTask = true;
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
index 52944200..aa8a5841 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
@@ -62,42 +62,47 @@ class ShardSyncer {
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
- syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
+ syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
}
/**
* Check and create leases for any new shards (e.g. following a reshard operation).
- *
+ *
* @param kinesisProxy
* @param leaseManager
* @param initialPositionInStream
- * @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
- * shows this shard to be closed (e.g. parent shard must be closed after a reshard operation).
- * If it is open, we assume this is an race condition around a reshard event and throw
- * a KinesisClientLibIOException so client can backoff and retry later.
+ * @param cleanupLeasesOfCompletedShards
+ * @param ignoreUnexpectedChildShards
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
+ static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
+ ILeaseManager leaseManager,
+ InitialPositionInStreamExtended initialPositionInStream,
+ boolean cleanupLeasesOfCompletedShards,
+ boolean ignoreUnexpectedChildShards)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
+ syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
+ }
+
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
- syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
+ checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
}
/**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
- *
+ *
* @param kinesisProxy
* @param leaseManager
- * @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
- * does not show this shard to be open (e.g. parent shard must be closed after a reshard operation).
- * If it is still open, we assume this is a race condition around a reshard event and
- * throw a KinesisClientLibIOException so client can backoff and retry later. If the shard doesn't exist in
- * Kinesis at all, we assume this is an old/expired shard and continue with the sync operation.
+ * @param initialPosition
+ * @param cleanupLeasesOfCompletedShards
+ * @param ignoreUnexpectedChildShards
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
@@ -107,18 +112,23 @@ class ShardSyncer {
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
InitialPositionInStreamExtended initialPosition,
- boolean cleanupLeasesOfCompletedShards)
+ boolean cleanupLeasesOfCompletedShards,
+ boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
List shards = getShardList(kinesisProxy);
LOG.debug("Num shards: " + shards.size());
Map shardIdToShardMap = constructShardIdToShardMap(shards);
Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
- assertAllParentShardsAreClosed(shardIdToChildShardIdsMap, shardIdToShardMap);
-
+ Set inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
+ if (!ignoreUnexpectedChildShards) {
+ assertAllParentShardsAreClosed(inconsistentShardIds);
+ }
+
List currentLeases = leaseManager.listLeases();
-
- List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition);
+
+ List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
+ inconsistentShardIds);
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
for (KinesisClientLease lease : newLeasesToCreate) {
long startTimeMillis = System.currentTimeMillis();
@@ -130,10 +140,10 @@ class ShardSyncer {
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
}
}
-
+
List trackedLeases = new ArrayList<>();
if (currentLeases != null) {
- trackedLeases.addAll(currentLeases);
+ trackedLeases.addAll(currentLeases);
}
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
@@ -149,19 +159,39 @@ class ShardSyncer {
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
* and a reshard operation.
- * @param shardIdToChildShardIdsMap
- * @param shardIdToShardMap
+ * @param inconsistentShardIds
* @throws KinesisClientLibIOException
*/
- private static void assertAllParentShardsAreClosed(Map> shardIdToChildShardIdsMap,
- Map shardIdToShardMap) throws KinesisClientLibIOException {
+ private static void assertAllParentShardsAreClosed(Set inconsistentShardIds)
+ throws KinesisClientLibIOException {
+ if (!inconsistentShardIds.isEmpty()) {
+ String ids = "";
+ for (String id : inconsistentShardIds) {
+ ids += " " + id;
+ }
+ throw new KinesisClientLibIOException(String.valueOf(inconsistentShardIds.size()) + " open child shards (" + ids + ") are inconsistent."
+ + "This can happen due to a race condition between describeStream and a reshard operation.");
+ }
+ }
+
+ /**
+ * Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor
+ * parent(s).
+ * @param shardIdToChildShardIdsMap
+ * @param shardIdToShardMap
+ * @return Set of inconsistent open shard ids for shards having open parents.
+ */
+ private static Set findInconsistentShardIds(Map> shardIdToChildShardIdsMap,
+ Map shardIdToShardMap) {
+ Set result = new HashSet();
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
Shard parentShard = shardIdToShardMap.get(parentShardId);
if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) {
- throw new KinesisClientLibIOException("Parent shardId " + parentShardId + " is not closed. "
- + "This can happen due to a race condition between describeStream and a reshard operation.");
+ Set childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId);
+ result.addAll(childShardIdsMap);
}
}
+ return result;
}
/**
@@ -179,7 +209,7 @@ class ShardSyncer {
}
/**
- * Note: this has package level access for testing purposes.
+ * Note: this has package level access for testing purposes.
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
* We verify that if the shard is present in the shard list, it is closed and its hash key range
* is covered by its child shards.
@@ -190,17 +220,17 @@ class ShardSyncer {
*/
static synchronized void assertClosedShardsAreCoveredOrAbsent(Map shardIdToShardMap,
Map> shardIdToChildShardIdsMap,
- Set shardIdsOfClosedShards) throws KinesisClientLibIOException {
+ Set shardIdsOfClosedShards) throws KinesisClientLibIOException {
String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
+ " while a reshard operation was in progress.";
-
+
for (String shardId : shardIdsOfClosedShards) {
Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) {
LOG.info("Shard " + shardId + " is not present in Kinesis anymore.");
continue;
}
-
+
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
if (endingSequenceNumber == null) {
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards
@@ -220,7 +250,7 @@ class ShardSyncer {
private static synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
Map shardIdToShardMap,
Set childShardIds) throws KinesisClientLibIOException {
-
+
BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey());
BigInteger minStartingHashKeyOfChildren = null;
@@ -239,16 +269,16 @@ class ShardSyncer {
maxEndingHashKeyOfChildren = endingHashKey;
}
}
-
+
if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null)
|| (minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0)
|| (maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) {
throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard "
+ closedShard.getShardId() + " is not covered by its child shards.");
}
-
+
}
-
+
/**
* Helper method to construct shardId->setOfChildShardIds map.
* Note: This has package access for testing purposes only.
@@ -270,7 +300,7 @@ class ShardSyncer {
}
childShardIds.add(shardId);
}
-
+
String adjacentParentShardId = shard.getAdjacentParentShardId();
if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) {
Set childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId);
@@ -296,8 +326,8 @@ class ShardSyncer {
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
- *
- * For each open (no ending sequence number) shard that doesn't already have a lease,
+ *
+ * For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
@@ -306,36 +336,43 @@ class ShardSyncer {
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
- * leases corresponding to both the parents - the parent shard which is not a descendant will have
+ * leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
- *
+ *
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
- *
+ *
* For example:
* Shard structure (each level depicts a stream segment):
- * 0 1 2 3 4 5- shards till epoch 102
- * \ / \ / | |
- * 6 7 4 5- shards from epoch 103 - 205
- * \ / | /\
- * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
+ * 0 1 2 3 4 5 - shards till epoch 102
+ * \ / \ / | |
+ * 6 7 4 5 - shards from epoch 103 - 205
+ * \ / | / \
+ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
- *
+ *
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
- *
+ *
+ * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
+ * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
+ * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
+ * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
+ *
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
+ * @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List determineNewLeasesToCreate(List shards,
List currentLeases,
- InitialPositionInStreamExtended initialPosition) {
+ InitialPositionInStreamExtended initialPosition,
+ Set inconsistentShardIds) {
Map shardIdToNewLeaseMap = new HashMap();
Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
@@ -354,6 +391,8 @@ class ShardSyncer {
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
if (shardIdsOfCurrentLeases.contains(shardId)) {
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
+ } else if (inconsistentShardIds.contains(shardId)) {
+ LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
} else {
LOG.debug("Need to create a lease for shardId " + shardId);
KinesisClientLease newLease = newKCLLease(shard);
@@ -407,12 +446,23 @@ class ShardSyncer {
return newLeasesToCreate;
}
+ /**
+ * Determine new leases to create and their initial checkpoint.
+ * Note: Package level access only for testing purposes.
+ */
+ static List determineNewLeasesToCreate(List shards,
+ List currentLeases,
+ InitialPositionInStreamExtended initialPosition) {
+ Set inconsistentShardIds = new HashSet();
+ return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
+ }
+
/**
* Note: Package level access for testing purposes only.
* Check if this shard is a descendant of a shard that is (or will be) processed.
* Create leases for the ancestors of this shard as required.
* See javadoc of determineNewLeasesToCreate() for rules and example.
- *
+ *
* @param shardId The shardId to check.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
@@ -429,7 +479,7 @@ class ShardSyncer {
Map shardIdToShardMapOfAllKinesisShards,
Map shardIdToLeaseMapOfNewShards,
Map memoizationContext) {
-
+
Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) {
return previousValue;
@@ -509,7 +559,7 @@ class ShardSyncer {
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
* a/ they are not null
* b/ if they exist in the current shard map (i.e. haven't expired)
- *
+ *
* @param shard Will return parents of this shard
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds
@@ -528,18 +578,18 @@ class ShardSyncer {
}
/**
- * Delete leases corresponding to shards that no longer exist in the stream.
+ * Delete leases corresponding to shards that no longer exist in the stream.
* Current scheme: Delete a lease if:
* * the corresponding shard is not present in the list of Kinesis shards, AND
* * the parentShardIds listed in the lease are also not present in the list of Kinesis shards.
* @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
- * @param trackedLeases List of
+ * @param trackedLeases List of
* @param kinesisProxy Kinesis proxy (used to get shard list)
- * @param leaseManager
+ * @param leaseManager
* @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis.
- * @throws ProvisionedThroughputException
- * @throws InvalidStateException
- * @throws DependencyException
+ * @throws ProvisionedThroughputException
+ * @throws InvalidStateException
+ * @throws DependencyException
*/
private static void cleanupGarbageLeases(List shards,
List trackedLeases,
@@ -550,7 +600,7 @@ class ShardSyncer {
for (Shard shard : shards) {
kinesisShards.add(shard.getShardId());
}
-
+
// Check if there are leases for non-existent shards
List garbageLeases = new ArrayList<>();
for (KinesisClientLease lease : trackedLeases) {
@@ -558,10 +608,10 @@ class ShardSyncer {
garbageLeases.add(lease);
}
}
-
+
if (!garbageLeases.isEmpty()) {
LOG.info("Found " + garbageLeases.size()
- + " candidate leases for cleanup. Refreshing list of"
+ + " candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards");
List currentShardList = getShardList(kinesisProxy);
Set currentKinesisShardIds = new HashSet<>();
@@ -577,12 +627,12 @@ class ShardSyncer {
}
}
}
-
+
}
/**
* Note: This method has package level access, solely for testing purposes.
- *
+ *
* @param lease Candidate shard we are considering for deletion.
* @param currentKinesisShardIds
* @return true if neither the shard (corresponding to the lease), nor its parents are present in
@@ -593,16 +643,16 @@ class ShardSyncer {
static boolean isCandidateForCleanup(KinesisClientLease lease, Set currentKinesisShardIds)
throws KinesisClientLibIOException {
boolean isCandidateForCleanup = true;
-
+
if (currentKinesisShardIds.contains(lease.getLeaseKey())) {
isCandidateForCleanup = false;
} else {
LOG.info("Found lease for non-existent shard: " + lease.getLeaseKey() + ". Checking its parent shards");
Set parentShardIds = lease.getParentShardIds();
for (String parentShardId : parentShardIds) {
-
+
// Throw an exception if the parent shard exists (but the child does not).
- // This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
+ // This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
if (currentKinesisShardIds.contains(parentShardId)) {
String message =
"Parent shard " + parentShardId + " exists but not the child shard "
@@ -615,14 +665,14 @@ class ShardSyncer {
return isCandidateForCleanup;
}
-
+
/**
* Private helper method.
* Clean up leases for shards that meet the following criteria:
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
* TRIM_HORIZON.
- *
+ *
* @param currentLeases List of leases we evaluate for clean up
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
@@ -664,22 +714,22 @@ class ShardSyncer {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
}
}
- }
+ }
}
- /**
+ /**
* Delete lease for the closed shard. Rules for deletion are:
* a/ the checkpoint for the closed shard is SHARD_END,
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
* Note: This method has package level access solely for testing purposes.
- *
+ *
* @param closedShardId Identifies the closed shard
* @param childShardIds ShardIds of children of the closed shard
* @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null)
- * @param leaseManager
- * @throws ProvisionedThroughputException
- * @throws InvalidStateException
- * @throws DependencyException
+ * @param leaseManager
+ * @throws ProvisionedThroughputException
+ * @throws InvalidStateException
+ * @throws DependencyException
*/
static synchronized void cleanupLeaseForClosedShard(String closedShardId,
Set childShardIds,
@@ -688,14 +738,14 @@ class ShardSyncer {
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
List childShardLeases = new ArrayList<>();
-
+
for (String childShardId : childShardIds) {
KinesisClientLease childLease = trackedLeases.get(childShardId);
if (childLease != null) {
childShardLeases.add(childLease);
}
}
-
+
if ((leaseForClosedShard != null)
&& (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END))
&& (childShardLeases.size() == childShardIds.size())) {
@@ -706,7 +756,7 @@ class ShardSyncer {
break;
}
}
-
+
if (okayToDelete) {
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
+ " as it has been completely processed and processing of child shards has begun.");
@@ -718,7 +768,7 @@ class ShardSyncer {
/**
* Helper method to create a new KinesisClientLease POJO for a shard.
* Note: Package level access only for testing purposes
- *
+ *
* @param shard
* @return
*/
@@ -740,7 +790,7 @@ class ShardSyncer {
/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
- *
+ *
* @param shards List of shards
* @return ShardId->Shard map
*/
@@ -755,7 +805,7 @@ class ShardSyncer {
/**
* Helper method to return all the open shards for a stream.
* Note: Package level access only for testing purposes.
- *
+ *
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
@@ -773,7 +823,7 @@ class ShardSyncer {
private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
-
+
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
@@ -781,10 +831,10 @@ class ShardSyncer {
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
}
-
+
return checkpoint;
}
-
+
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
*
*/
@@ -794,7 +844,7 @@ class ShardSyncer {
private static final long serialVersionUID = 1L;
private final Map shardIdToShardMap;
-
+
/**
* @param shardIdToShardMapOfAllKinesisShards
*/
@@ -808,7 +858,7 @@ class ShardSyncer {
* We assume that lease1 and lease2 are:
* a/ not null,
* b/ shards (if found) have non-null starting sequence numbers
- *
+ *
* {@inheritDoc}
*/
@Override
@@ -818,23 +868,23 @@ class ShardSyncer {
String shardId2 = lease2.getLeaseKey();
Shard shard1 = shardIdToShardMap.get(shardId1);
Shard shard2 = shardIdToShardMap.get(shardId2);
-
+
// If we found shards for the two leases, use comparison of the starting sequence numbers
if ((shard1 != null) && (shard2 != null)) {
BigInteger sequenceNumber1 =
new BigInteger(shard1.getSequenceNumberRange().getStartingSequenceNumber());
BigInteger sequenceNumber2 =
new BigInteger(shard2.getSequenceNumberRange().getStartingSequenceNumber());
- result = sequenceNumber1.compareTo(sequenceNumber2);
+ result = sequenceNumber1.compareTo(sequenceNumber2);
}
-
+
if (result == 0) {
result = shardId1.compareTo(shardId2);
}
-
+
return result;
}
-
+
}
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
index f56033a8..1d49817e 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
@@ -44,6 +44,7 @@ class ShutdownTask implements ITask {
private final ILeaseManager leaseManager;
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards;
+ private final boolean ignoreUnexpectedChildShards;
private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@@ -59,8 +60,9 @@ class ShutdownTask implements ITask {
IKinesisProxy kinesisProxy,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
+ boolean ignoreUnexpectedChildShards,
ILeaseManager leaseManager,
- long backoffTimeMillis,
+ long backoffTimeMillis,
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
@@ -69,6 +71,7 @@ class ShutdownTask implements ITask {
this.kinesisProxy = kinesisProxy;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
+ this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.leaseManager = leaseManager;
this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
@@ -77,7 +80,7 @@ class ShutdownTask implements ITask {
/*
* Invokes RecordProcessor shutdown() API.
* (non-Javadoc)
- *
+ *
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/
@Override
@@ -127,7 +130,8 @@ class ShutdownTask implements ITask {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPositionInStream,
- cleanupLeasesOfCompletedShards);
+ cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards);
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
}
@@ -152,7 +156,7 @@ class ShutdownTask implements ITask {
/*
* (non-Javadoc)
- *
+ *
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType()
*/
@Override
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index 3cfb9f2f..0906bbb0 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -103,6 +103,7 @@ public class Worker implements Runnable {
// info, value is ShardConsumer.
private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap();
private final boolean cleanupLeasesUponShardCompletion;
+ private final boolean ignoreUnexpectedChildShards;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@@ -253,7 +254,8 @@ public class Worker implements Runnable {
config.shouldValidateSequenceNumberBeforeCheckpointing(),
config.getInitialPositionInStreamExtended()),
config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(),
- config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
+ config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(),
+ config.shouldIgnoreUnexpectedChildShards(), null,
new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
config.getWorkerIdentifier(),
@@ -318,6 +320,8 @@ public class Worker implements Runnable {
* Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
+ * @param ignoreUnexpectedChildShards
+ * Ignore child shards with open parents
* @param checkpoint
* Used to get/set checkpoints
* @param leaseCoordinator
@@ -335,14 +339,14 @@ public class Worker implements Runnable {
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
- long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
- KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
+ long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards,
+ ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
- shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
- metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
- shardPrioritization, Optional.empty(), Optional.empty());
+ shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, checkpoint,
+ leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
+ skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty());
}
@@ -363,6 +367,8 @@ public class Worker implements Runnable {
* Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
+ * @param ignoreUnexpectedChildShards
+ * Ignore child shards with open parents
* @param checkpoint
* Used to get/set checkpoints
* @param leaseCoordinator
@@ -384,8 +390,8 @@ public class Worker implements Runnable {
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
- long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
- KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
+ long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards,
+ ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) {
@@ -395,14 +401,15 @@ public class Worker implements Runnable {
this.initialPosition = initialPositionInStream;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
+ this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
this.executorService = execService;
this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory;
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
- initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
- executorService);
+ initialPositionInStream, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
+ metricsFactory, executorService);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
@@ -494,7 +501,8 @@ public class Worker implements Runnable {
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
- leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L);
+ leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards, 0L);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
@@ -792,7 +800,7 @@ public class Worker implements Runnable {
/**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
- *
+ *
* @return Whether worker should shutdown immediately.
*/
@VisibleForTesting
@@ -844,7 +852,7 @@ public class Worker implements Runnable {
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
- executorService, metricsFactory, taskBackoffTimeMillis,
+ ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool);
}
@@ -1164,10 +1172,10 @@ public class Worker implements Runnable {
/**
* Provides logic how to prioritize shard processing.
- *
+ *
* @param shardPrioritization
* shardPrioritization is responsible to order shards before processing
- *
+ *
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
@@ -1255,6 +1263,7 @@ public class Worker implements Runnable {
config.getParentShardPollIntervalMillis(),
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
+ config.shouldIgnoreUnexpectedChildShards(),
null,
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(),
dynamoDBClient),
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java
index cfa8be10..30e6ce87 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java
@@ -77,6 +77,7 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_LONG,
TEST_VALUE_LONG,
true,
+ false,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
@@ -116,6 +117,7 @@ public class KinesisClientLibConfigurationTest {
longValues[2],
longValues[3],
true,
+ false,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
@@ -151,6 +153,7 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_LONG,
TEST_VALUE_LONG,
true,
+ false,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
@@ -315,6 +318,7 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_LONG,
TEST_VALUE_LONG,
true,
+ false,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
index a3f786a6..3eef97da 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
@@ -87,6 +87,7 @@ public class ShardConsumerTest {
private final long taskBackoffTimeMillis = 500L;
private final long parentShardPollIntervalMillis = 50L;
private final boolean cleanupLeasesOfCompletedShards = true;
+ private final boolean ignoreUnexpectedChildShards = false;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
@@ -134,6 +135,7 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@@ -182,6 +184,7 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
spyExecutorService,
metricsFactory,
taskBackoffTimeMillis,
@@ -223,6 +226,7 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@@ -318,6 +322,7 @@ public class ShardConsumerTest {
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@@ -421,6 +426,7 @@ public class ShardConsumerTest {
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@@ -483,6 +489,7 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@@ -530,6 +537,7 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@@ -558,6 +566,7 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
index 307596e3..5fab6375 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
@@ -105,7 +105,7 @@ public class ShardSyncTaskIntegrationTest {
/**
* Test method for call().
- *
+ *
* @throws CapacityExceededException
* @throws DependencyException
* @throws InvalidStateException
@@ -123,7 +123,7 @@ public class ShardSyncTaskIntegrationTest {
ShardSyncTask syncTask = new ShardSyncTask(kinesisProxy,
leaseManager,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
- false,
+ false, false,
0L);
syncTask.call();
List leases = leaseManager.listLeases();
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java
index 5d91c698..5781826e 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java
@@ -100,6 +100,7 @@ public class ShutdownTaskTest {
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class);
boolean cleanupLeasesOfCompletedShards = false;
+ boolean ignoreUnexpectedChildShards = false;
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
@@ -107,6 +108,7 @@ public class ShutdownTaskTest {
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
leaseManager,
TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy);
@@ -126,6 +128,7 @@ public class ShutdownTaskTest {
when(kinesisProxy.getShardList()).thenReturn(null);
ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class);
boolean cleanupLeasesOfCompletedShards = false;
+ boolean ignoreUnexpectedChildShards = false;
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
@@ -133,6 +136,7 @@ public class ShutdownTaskTest {
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards,
leaseManager,
TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy);
@@ -147,7 +151,7 @@ public class ShutdownTaskTest {
*/
@Test
public final void testGetTaskType() {
- ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy);
+ ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsRetrievalStrategy);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
index 5913bf0d..17444fee 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
@@ -119,6 +119,7 @@ public class WorkerTest {
private final long parentShardPollIntervalMillis = 5L;
private final long shardSyncIntervalMillis = 5L;
private final boolean cleanupLeasesUponShardCompletion = true;
+ private final boolean ignoreUnexpectedChildShards = false;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
@@ -156,7 +157,7 @@ public class WorkerTest {
private TaskResult taskResult;
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
- private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
+ private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
@Override
@@ -189,8 +190,8 @@ public class WorkerTest {
};
}
};
-
- private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
+
+ private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
@@ -232,6 +233,7 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards,
checkpoint,
leaseCoordinator,
execService,
@@ -276,8 +278,9 @@ public class WorkerTest {
.thenReturn(secondCheckpoint);
Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
- parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint,
- leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
+ parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards, checkpoint, leaseCoordinator, execService, nullMetricsFactory,
+ taskBackoffTimeMillis, failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization);
Worker workerSpy = spy(worker);
@@ -336,6 +339,7 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards,
checkpoint,
leaseCoordinator,
execService,
@@ -390,6 +394,7 @@ public class WorkerTest {
shardPollInterval,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
execService,
@@ -603,7 +608,7 @@ public class WorkerTest {
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
* This behavior makes the test a bit racy, since we need to ensure a specific order of events.
- *
+ *
* @throws Exception
*/
@Test
@@ -744,8 +749,9 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
- cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
- taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
+ cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
+ executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
+ shardPrioritization);
when(executorService.submit(Matchers.> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@@ -818,8 +824,8 @@ public class WorkerTest {
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
- cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
- taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
+ cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
+ executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@Override
void postConstruct() {
this.gracefuleShutdownStarted = true;
@@ -890,8 +896,8 @@ public class WorkerTest {
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
- cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
- taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
+ cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
+ executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@Override
void postConstruct() {
this.gracefulShutdownCoordinator = coordinator;
@@ -952,8 +958,9 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
- cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
- taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
+ cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
+ executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
+ shardPrioritization);
when(executorService.submit(Matchers.> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@@ -1022,8 +1029,9 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
- cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
- taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
+ cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
+ executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
+ shardPrioritization);
when(executorService.submit(Matchers.> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@@ -1123,8 +1131,9 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
- cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
- taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
+ cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
+ executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
+ shardPrioritization);
when(executorService.submit(Matchers.> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@@ -1228,8 +1237,9 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
- cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
- taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
+ cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
+ executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
+ shardPrioritization);
when(executorService.submit(Matchers.> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@@ -1300,8 +1310,9 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
- cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
- taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
+ cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
+ executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
+ shardPrioritization);
when(executorService.submit(Matchers.> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@@ -1338,14 +1349,15 @@ public class WorkerTest {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
- boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
+ boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion,
- checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis,
- failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization);
+ ignoreUnexpectedChildShards, checkpoint, leaseCoordinator, execService, metricsFactory,
+ taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
+ shardPrioritization);
postConstruct();
}
@@ -1657,6 +1669,7 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
executorService,
@@ -1665,7 +1678,7 @@ public class WorkerTest {
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
-
+
WorkerThread workerThread = new WorkerThread(worker);
workerThread.start();
return workerThread;