allow unexpected child shards to be ignored
now instead of always throwing an assertion if a child shard has an open parent, consider worker configuration before doing so. if configured to ignore such shards, do not create leases for them during shard sync. this is intended to mitigate failing worker init when processing dynamodb streams with many thousands of shards (which can happen for tables with thousands of partitions). this new behavior can be enabled by adding the following to a configuration/properties file: ``` ignoreUnexpectedChildShards = true ```
This commit is contained in:
parent
9720b1b249
commit
2afb0bd00b
13 changed files with 299 additions and 158 deletions
|
|
@ -21,7 +21,7 @@ import java.util.Optional;
|
||||||
* and state transitions is contained within the {@link ConsumerState} objects.
|
* and state transitions is contained within the {@link ConsumerState} objects.
|
||||||
*
|
*
|
||||||
* <h2>State Diagram</h2>
|
* <h2>State Diagram</h2>
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* +-------------------+
|
* +-------------------+
|
||||||
* | Waiting on Parent | +------------------+
|
* | Waiting on Parent | +------------------+
|
||||||
|
|
@ -96,14 +96,14 @@ class ConsumerStates {
|
||||||
/**
|
/**
|
||||||
* Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to
|
* Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to
|
||||||
* do when a transition occurs.
|
* do when a transition occurs.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
interface ConsumerState {
|
interface ConsumerState {
|
||||||
/**
|
/**
|
||||||
* Creates a new task for this state using the passed in consumer to build the task. If there is no task
|
* Creates a new task for this state using the passed in consumer to build the task. If there is no task
|
||||||
* required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
|
* required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
|
||||||
* consumer during the execution of this method.
|
* consumer during the execution of this method.
|
||||||
*
|
*
|
||||||
* @param consumer
|
* @param consumer
|
||||||
* the consumer to use build the task, or execute state.
|
* the consumer to use build the task, or execute state.
|
||||||
* @return a valid task for this state or null if there is no task required.
|
* @return a valid task for this state or null if there is no task required.
|
||||||
|
|
@ -113,7 +113,7 @@ class ConsumerStates {
|
||||||
/**
|
/**
|
||||||
* Provides the next state of the consumer upon success of the task return by
|
* Provides the next state of the consumer upon success of the task return by
|
||||||
* {@link ConsumerState#createTask(ShardConsumer)}.
|
* {@link ConsumerState#createTask(ShardConsumer)}.
|
||||||
*
|
*
|
||||||
* @return the next state that the consumer should transition to, this may be the same object as the current
|
* @return the next state that the consumer should transition to, this may be the same object as the current
|
||||||
* state.
|
* state.
|
||||||
*/
|
*/
|
||||||
|
|
@ -122,7 +122,7 @@ class ConsumerStates {
|
||||||
/**
|
/**
|
||||||
* Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent
|
* Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent
|
||||||
* on the current state, and the shutdown reason.
|
* on the current state, and the shutdown reason.
|
||||||
*
|
*
|
||||||
* @param shutdownReason
|
* @param shutdownReason
|
||||||
* the reason that a shutdown was requested
|
* the reason that a shutdown was requested
|
||||||
* @return the next state that the consumer should transition to, this may be the same object as the current
|
* @return the next state that the consumer should transition to, this may be the same object as the current
|
||||||
|
|
@ -133,7 +133,7 @@ class ConsumerStates {
|
||||||
/**
|
/**
|
||||||
* The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state
|
* The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state
|
||||||
* even if createTask would return a null value.
|
* even if createTask would return a null value.
|
||||||
*
|
*
|
||||||
* @return the type of task that this state represents.
|
* @return the type of task that this state represents.
|
||||||
*/
|
*/
|
||||||
TaskType getTaskType();
|
TaskType getTaskType();
|
||||||
|
|
@ -141,7 +141,7 @@ class ConsumerStates {
|
||||||
/**
|
/**
|
||||||
* An enumeration represent the type of this state. Different consumer states may return the same
|
* An enumeration represent the type of this state. Different consumer states may return the same
|
||||||
* {@link ShardConsumerState}.
|
* {@link ShardConsumerState}.
|
||||||
*
|
*
|
||||||
* @return the type of consumer state this represents.
|
* @return the type of consumer state this represents.
|
||||||
*/
|
*/
|
||||||
ShardConsumerState getState();
|
ShardConsumerState getState();
|
||||||
|
|
@ -515,7 +515,9 @@ class ConsumerStates {
|
||||||
consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(),
|
consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(),
|
||||||
consumer.getStreamConfig().getStreamProxy(),
|
consumer.getStreamConfig().getStreamProxy(),
|
||||||
consumer.getStreamConfig().getInitialPositionInStream(),
|
consumer.getStreamConfig().getInitialPositionInStream(),
|
||||||
consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(),
|
consumer.isCleanupLeasesOfCompletedShards(),
|
||||||
|
consumer.isIgnoreUnexpectedChildShards(),
|
||||||
|
consumer.getLeaseManager(),
|
||||||
consumer.getTaskBackoffTimeMillis(),
|
consumer.getTaskBackoffTimeMillis(),
|
||||||
consumer.getGetRecordsRetrievalStrategy());
|
consumer.getGetRecordsRetrievalStrategy());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -86,6 +86,11 @@ public class KinesisClientLibConfiguration {
|
||||||
*/
|
*/
|
||||||
public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true;
|
public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ignore child shards with open parents.
|
||||||
|
*/
|
||||||
|
public static final boolean DEFAULT_IGNORE_UNEXPECTED_CHILD_SHARDS = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
|
* Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
|
||||||
*/
|
*/
|
||||||
|
|
@ -200,6 +205,7 @@ public class KinesisClientLibConfiguration {
|
||||||
private boolean callProcessRecordsEvenForEmptyRecordList;
|
private boolean callProcessRecordsEvenForEmptyRecordList;
|
||||||
private long parentShardPollIntervalMillis;
|
private long parentShardPollIntervalMillis;
|
||||||
private boolean cleanupLeasesUponShardCompletion;
|
private boolean cleanupLeasesUponShardCompletion;
|
||||||
|
private boolean ignoreUnexpectedChildShards;
|
||||||
private ClientConfiguration kinesisClientConfig;
|
private ClientConfiguration kinesisClientConfig;
|
||||||
private ClientConfiguration dynamoDBClientConfig;
|
private ClientConfiguration dynamoDBClientConfig;
|
||||||
private ClientConfiguration cloudWatchClientConfig;
|
private ClientConfiguration cloudWatchClientConfig;
|
||||||
|
|
@ -272,6 +278,7 @@ public class KinesisClientLibConfiguration {
|
||||||
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
|
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
|
||||||
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
|
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
|
||||||
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
|
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
|
||||||
|
DEFAULT_IGNORE_UNEXPECTED_CHILD_SHARDS,
|
||||||
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
|
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
|
||||||
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
|
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
|
||||||
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null,
|
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null,
|
||||||
|
|
@ -300,6 +307,7 @@ public class KinesisClientLibConfiguration {
|
||||||
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
|
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
|
||||||
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
|
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
|
||||||
* in Kinesis)
|
* in Kinesis)
|
||||||
|
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
|
||||||
* @param kinesisClientConfig Client Configuration used by Kinesis client
|
* @param kinesisClientConfig Client Configuration used by Kinesis client
|
||||||
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
|
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
|
||||||
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
|
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
|
||||||
|
|
@ -329,6 +337,7 @@ public class KinesisClientLibConfiguration {
|
||||||
long parentShardPollIntervalMillis,
|
long parentShardPollIntervalMillis,
|
||||||
long shardSyncIntervalMillis,
|
long shardSyncIntervalMillis,
|
||||||
boolean cleanupTerminatedShardsBeforeExpiry,
|
boolean cleanupTerminatedShardsBeforeExpiry,
|
||||||
|
boolean ignoreUnexpectedChildShards,
|
||||||
ClientConfiguration kinesisClientConfig,
|
ClientConfiguration kinesisClientConfig,
|
||||||
ClientConfiguration dynamoDBClientConfig,
|
ClientConfiguration dynamoDBClientConfig,
|
||||||
ClientConfiguration cloudWatchClientConfig,
|
ClientConfiguration cloudWatchClientConfig,
|
||||||
|
|
@ -342,7 +351,7 @@ public class KinesisClientLibConfiguration {
|
||||||
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
|
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
|
||||||
maxRecords, idleTimeBetweenReadsInMillis,
|
maxRecords, idleTimeBetweenReadsInMillis,
|
||||||
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis,
|
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis,
|
||||||
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
|
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, ignoreUnexpectedChildShards,
|
||||||
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
|
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
|
||||||
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
|
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
|
||||||
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis);
|
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis);
|
||||||
|
|
@ -371,6 +380,7 @@ public class KinesisClientLibConfiguration {
|
||||||
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
|
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
|
||||||
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
|
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
|
||||||
* in Kinesis)
|
* in Kinesis)
|
||||||
|
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
|
||||||
* @param kinesisClientConfig Client Configuration used by Kinesis client
|
* @param kinesisClientConfig Client Configuration used by Kinesis client
|
||||||
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
|
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
|
||||||
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
|
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
|
||||||
|
|
@ -400,6 +410,7 @@ public class KinesisClientLibConfiguration {
|
||||||
long parentShardPollIntervalMillis,
|
long parentShardPollIntervalMillis,
|
||||||
long shardSyncIntervalMillis,
|
long shardSyncIntervalMillis,
|
||||||
boolean cleanupTerminatedShardsBeforeExpiry,
|
boolean cleanupTerminatedShardsBeforeExpiry,
|
||||||
|
boolean ignoreUnexpectedChildShards,
|
||||||
ClientConfiguration kinesisClientConfig,
|
ClientConfiguration kinesisClientConfig,
|
||||||
ClientConfiguration dynamoDBClientConfig,
|
ClientConfiguration dynamoDBClientConfig,
|
||||||
ClientConfiguration cloudWatchClientConfig,
|
ClientConfiguration cloudWatchClientConfig,
|
||||||
|
|
@ -436,6 +447,7 @@ public class KinesisClientLibConfiguration {
|
||||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||||
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
|
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
|
||||||
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
|
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.workerIdentifier = workerId;
|
this.workerIdentifier = workerId;
|
||||||
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
|
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
|
||||||
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
|
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
|
||||||
|
|
@ -670,6 +682,13 @@ public class KinesisClientLibConfiguration {
|
||||||
return cleanupLeasesUponShardCompletion;
|
return cleanupLeasesUponShardCompletion;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if we should ignore child shards which have open parents
|
||||||
|
*/
|
||||||
|
public boolean shouldIgnoreUnexpectedChildShards() {
|
||||||
|
return ignoreUnexpectedChildShards;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before
|
* @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before
|
||||||
* checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)}
|
* checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)}
|
||||||
|
|
@ -890,6 +909,16 @@ public class KinesisClientLibConfiguration {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param ignoreUnexpectedChildShards Ignore child shards with open parents.
|
||||||
|
* @return KinesisClientLibConfiguration
|
||||||
|
*/
|
||||||
|
public KinesisClientLibConfiguration withIgnoreUnexpectedChildShards(
|
||||||
|
boolean ignoreUnexpectedChildShards) {
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client
|
* @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client
|
||||||
* @return KinesisClientLibConfiguration
|
* @return KinesisClientLibConfiguration
|
||||||
|
|
|
||||||
|
|
@ -53,6 +53,7 @@ class ShardConsumer {
|
||||||
// Backoff time when polling to check if application has finished processing parent shards
|
// Backoff time when polling to check if application has finished processing parent shards
|
||||||
private final long parentShardPollIntervalMillis;
|
private final long parentShardPollIntervalMillis;
|
||||||
private final boolean cleanupLeasesOfCompletedShards;
|
private final boolean cleanupLeasesOfCompletedShards;
|
||||||
|
private final boolean ignoreUnexpectedChildShards;
|
||||||
private final long taskBackoffTimeMillis;
|
private final long taskBackoffTimeMillis;
|
||||||
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
|
|
||||||
|
|
@ -97,7 +98,7 @@ class ShardConsumer {
|
||||||
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
|
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
|
||||||
* @param backoffTimeMillis backoff interval when we encounter exceptions
|
* @param backoffTimeMillis backoff interval when we encounter exceptions
|
||||||
*/
|
*/
|
||||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 11 LINES
|
||||||
ShardConsumer(ShardInfo shardInfo,
|
ShardConsumer(ShardInfo shardInfo,
|
||||||
StreamConfig streamConfig,
|
StreamConfig streamConfig,
|
||||||
ICheckpoint checkpoint,
|
ICheckpoint checkpoint,
|
||||||
|
|
@ -105,12 +106,13 @@ class ShardConsumer {
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
long parentShardPollIntervalMillis,
|
long parentShardPollIntervalMillis,
|
||||||
boolean cleanupLeasesOfCompletedShards,
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards,
|
||||||
ExecutorService executorService,
|
ExecutorService executorService,
|
||||||
IMetricsFactory metricsFactory,
|
IMetricsFactory metricsFactory,
|
||||||
long backoffTimeMillis,
|
long backoffTimeMillis,
|
||||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
|
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
|
||||||
this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis,
|
this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis,
|
||||||
cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis,
|
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, executorService, metricsFactory, backoffTimeMillis,
|
||||||
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
|
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -135,6 +137,7 @@ class ShardConsumer {
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
long parentShardPollIntervalMillis,
|
long parentShardPollIntervalMillis,
|
||||||
boolean cleanupLeasesOfCompletedShards,
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards,
|
||||||
ExecutorService executorService,
|
ExecutorService executorService,
|
||||||
IMetricsFactory metricsFactory,
|
IMetricsFactory metricsFactory,
|
||||||
long backoffTimeMillis,
|
long backoffTimeMillis,
|
||||||
|
|
@ -157,6 +160,7 @@ class ShardConsumer {
|
||||||
this.metricsFactory = metricsFactory;
|
this.metricsFactory = metricsFactory;
|
||||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||||
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.taskBackoffTimeMillis = backoffTimeMillis;
|
this.taskBackoffTimeMillis = backoffTimeMillis;
|
||||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo);
|
this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo);
|
||||||
|
|
@ -165,7 +169,7 @@ class ShardConsumer {
|
||||||
/**
|
/**
|
||||||
* No-op if current task is pending, otherwise submits next task for this shard.
|
* No-op if current task is pending, otherwise submits next task for this shard.
|
||||||
* This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state.
|
* This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state.
|
||||||
*
|
*
|
||||||
* @return true if a new process task was submitted, false otherwise
|
* @return true if a new process task was submitted, false otherwise
|
||||||
*/
|
*/
|
||||||
synchronized boolean consumeShard() {
|
synchronized boolean consumeShard() {
|
||||||
|
|
@ -260,7 +264,7 @@ class ShardConsumer {
|
||||||
/**
|
/**
|
||||||
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
|
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
|
||||||
* before being shutdown.
|
* before being shutdown.
|
||||||
*
|
*
|
||||||
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
|
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
|
||||||
*/
|
*/
|
||||||
void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
|
void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
|
||||||
|
|
@ -271,7 +275,7 @@ class ShardConsumer {
|
||||||
/**
|
/**
|
||||||
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
|
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
|
||||||
* This is called by Worker when it loses responsibility for a shard.
|
* This is called by Worker when it loses responsibility for a shard.
|
||||||
*
|
*
|
||||||
* @return true if shutdown is complete (false if shutdown is still in progress)
|
* @return true if shutdown is complete (false if shutdown is still in progress)
|
||||||
*/
|
*/
|
||||||
synchronized boolean beginShutdown() {
|
synchronized boolean beginShutdown() {
|
||||||
|
|
@ -291,7 +295,7 @@ class ShardConsumer {
|
||||||
/**
|
/**
|
||||||
* Used (by Worker) to check if this ShardConsumer instance has been shutdown
|
* Used (by Worker) to check if this ShardConsumer instance has been shutdown
|
||||||
* RecordProcessor shutdown() has been invoked, as appropriate.
|
* RecordProcessor shutdown() has been invoked, as appropriate.
|
||||||
*
|
*
|
||||||
* @return true if shutdown is complete
|
* @return true if shutdown is complete
|
||||||
*/
|
*/
|
||||||
boolean isShutdown() {
|
boolean isShutdown() {
|
||||||
|
|
@ -307,7 +311,7 @@ class ShardConsumer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Figure out next task to run based on current state, task, and shutdown context.
|
* Figure out next task to run based on current state, task, and shutdown context.
|
||||||
*
|
*
|
||||||
* @return Return next task to run
|
* @return Return next task to run
|
||||||
*/
|
*/
|
||||||
private ITask getNextTask() {
|
private ITask getNextTask() {
|
||||||
|
|
@ -323,7 +327,7 @@ class ShardConsumer {
|
||||||
/**
|
/**
|
||||||
* Note: This is a private/internal method with package level access solely for testing purposes.
|
* Note: This is a private/internal method with package level access solely for testing purposes.
|
||||||
* Update state based on information about: task success, current state, and shutdown info.
|
* Update state based on information about: task success, current state, and shutdown info.
|
||||||
*
|
*
|
||||||
* @param taskOutcome The outcome of the last task
|
* @param taskOutcome The outcome of the last task
|
||||||
*/
|
*/
|
||||||
void updateState(TaskOutcome taskOutcome) {
|
void updateState(TaskOutcome taskOutcome) {
|
||||||
|
|
@ -355,7 +359,7 @@ class ShardConsumer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Private/Internal method - has package level access solely for testing purposes.
|
* Private/Internal method - has package level access solely for testing purposes.
|
||||||
*
|
*
|
||||||
* @return the currentState
|
* @return the currentState
|
||||||
*/
|
*/
|
||||||
ConsumerStates.ShardConsumerState getCurrentState() {
|
ConsumerStates.ShardConsumerState getCurrentState() {
|
||||||
|
|
@ -402,6 +406,10 @@ class ShardConsumer {
|
||||||
return cleanupLeasesOfCompletedShards;
|
return cleanupLeasesOfCompletedShards;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isIgnoreUnexpectedChildShards() {
|
||||||
|
return ignoreUnexpectedChildShards;
|
||||||
|
}
|
||||||
|
|
||||||
long getTaskBackoffTimeMillis() {
|
long getTaskBackoffTimeMillis() {
|
||||||
return taskBackoffTimeMillis;
|
return taskBackoffTimeMillis;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ class ShardSyncTask implements ITask {
|
||||||
private final ILeaseManager<KinesisClientLease> leaseManager;
|
private final ILeaseManager<KinesisClientLease> leaseManager;
|
||||||
private InitialPositionInStreamExtended initialPosition;
|
private InitialPositionInStreamExtended initialPosition;
|
||||||
private final boolean cleanupLeasesUponShardCompletion;
|
private final boolean cleanupLeasesUponShardCompletion;
|
||||||
|
private final boolean ignoreUnexpectedChildShards;
|
||||||
private final long shardSyncTaskIdleTimeMillis;
|
private final long shardSyncTaskIdleTimeMillis;
|
||||||
private final TaskType taskType = TaskType.SHARDSYNC;
|
private final TaskType taskType = TaskType.SHARDSYNC;
|
||||||
|
|
||||||
|
|
@ -49,11 +50,13 @@ class ShardSyncTask implements ITask {
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
InitialPositionInStreamExtended initialPositionInStream,
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
boolean cleanupLeasesUponShardCompletion,
|
boolean cleanupLeasesUponShardCompletion,
|
||||||
|
boolean ignoreUnexpectedChildShards,
|
||||||
long shardSyncTaskIdleTimeMillis) {
|
long shardSyncTaskIdleTimeMillis) {
|
||||||
this.kinesisProxy = kinesisProxy;
|
this.kinesisProxy = kinesisProxy;
|
||||||
this.leaseManager = leaseManager;
|
this.leaseManager = leaseManager;
|
||||||
this.initialPosition = initialPositionInStream;
|
this.initialPosition = initialPositionInStream;
|
||||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
|
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,7 +71,8 @@ class ShardSyncTask implements ITask {
|
||||||
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
||||||
leaseManager,
|
leaseManager,
|
||||||
initialPosition,
|
initialPosition,
|
||||||
cleanupLeasesUponShardCompletion);
|
cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards);
|
||||||
if (shardSyncTaskIdleTimeMillis > 0) {
|
if (shardSyncTaskIdleTimeMillis > 0) {
|
||||||
Thread.sleep(shardSyncTaskIdleTimeMillis);
|
Thread.sleep(shardSyncTaskIdleTimeMillis);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,17 +44,19 @@ class ShardSyncTaskManager {
|
||||||
private final ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||||
private boolean cleanupLeasesUponShardCompletion;
|
private boolean cleanupLeasesUponShardCompletion;
|
||||||
|
private boolean ignoreUnexpectedChildShards;
|
||||||
private final long shardSyncIdleTimeMillis;
|
private final long shardSyncIdleTimeMillis;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*
|
*
|
||||||
* @param kinesisProxy Proxy used to fetch streamInfo (shards)
|
* @param kinesisProxy Proxy used to fetch streamInfo (shards)
|
||||||
* @param leaseManager Lease manager (used to list and create leases for shards)
|
* @param leaseManager Lease manager (used to list and create leases for shards)
|
||||||
* @param initialPositionInStream Initial position in stream
|
* @param initialPositionInStream Initial position in stream
|
||||||
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
|
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
|
||||||
* until they expire)
|
* until they expire)
|
||||||
|
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
|
||||||
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
|
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
|
||||||
* @param metricsFactory Metrics factory
|
* @param metricsFactory Metrics factory
|
||||||
* @param executorService ExecutorService to execute the shard sync tasks
|
* @param executorService ExecutorService to execute the shard sync tasks
|
||||||
|
|
@ -63,6 +65,7 @@ class ShardSyncTaskManager {
|
||||||
final ILeaseManager<KinesisClientLease> leaseManager,
|
final ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
final InitialPositionInStreamExtended initialPositionInStream,
|
final InitialPositionInStreamExtended initialPositionInStream,
|
||||||
final boolean cleanupLeasesUponShardCompletion,
|
final boolean cleanupLeasesUponShardCompletion,
|
||||||
|
final boolean ignoreUnexpectedChildShards,
|
||||||
final long shardSyncIdleTimeMillis,
|
final long shardSyncIdleTimeMillis,
|
||||||
final IMetricsFactory metricsFactory,
|
final IMetricsFactory metricsFactory,
|
||||||
ExecutorService executorService) {
|
ExecutorService executorService) {
|
||||||
|
|
@ -70,6 +73,7 @@ class ShardSyncTaskManager {
|
||||||
this.leaseManager = leaseManager;
|
this.leaseManager = leaseManager;
|
||||||
this.metricsFactory = metricsFactory;
|
this.metricsFactory = metricsFactory;
|
||||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
|
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
|
||||||
this.executorService = executorService;
|
this.executorService = executorService;
|
||||||
this.initialPositionInStream = initialPositionInStream;
|
this.initialPositionInStream = initialPositionInStream;
|
||||||
|
|
@ -99,6 +103,7 @@ class ShardSyncTaskManager {
|
||||||
leaseManager,
|
leaseManager,
|
||||||
initialPositionInStream,
|
initialPositionInStream,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
shardSyncIdleTimeMillis), metricsFactory);
|
shardSyncIdleTimeMillis), metricsFactory);
|
||||||
future = executorService.submit(currentTask);
|
future = executorService.submit(currentTask);
|
||||||
submittedNewTask = true;
|
submittedNewTask = true;
|
||||||
|
|
|
||||||
|
|
@ -62,42 +62,47 @@ class ShardSyncer {
|
||||||
InitialPositionInStreamExtended initialPositionInStream,
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
boolean cleanupLeasesOfCompletedShards)
|
boolean cleanupLeasesOfCompletedShards)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
|
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check and create leases for any new shards (e.g. following a reshard operation).
|
* Check and create leases for any new shards (e.g. following a reshard operation).
|
||||||
*
|
*
|
||||||
* @param kinesisProxy
|
* @param kinesisProxy
|
||||||
* @param leaseManager
|
* @param leaseManager
|
||||||
* @param initialPositionInStream
|
* @param initialPositionInStream
|
||||||
* @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
|
* @param cleanupLeasesOfCompletedShards
|
||||||
* shows this shard to be closed (e.g. parent shard must be closed after a reshard operation).
|
* @param ignoreUnexpectedChildShards
|
||||||
* If it is open, we assume this is an race condition around a reshard event and throw
|
|
||||||
* a KinesisClientLibIOException so client can backoff and retry later.
|
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
* @throws InvalidStateException
|
* @throws InvalidStateException
|
||||||
* @throws ProvisionedThroughputException
|
* @throws ProvisionedThroughputException
|
||||||
* @throws KinesisClientLibIOException
|
* @throws KinesisClientLibIOException
|
||||||
*/
|
*/
|
||||||
|
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
|
||||||
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards)
|
||||||
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
|
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
|
||||||
|
}
|
||||||
|
|
||||||
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
|
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
InitialPositionInStreamExtended initialPositionInStream,
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
boolean cleanupLeasesOfCompletedShards)
|
boolean cleanupLeasesOfCompletedShards)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
|
checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
|
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
|
||||||
*
|
*
|
||||||
* @param kinesisProxy
|
* @param kinesisProxy
|
||||||
* @param leaseManager
|
* @param leaseManager
|
||||||
* @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
|
* @param initialPosition
|
||||||
* does not show this shard to be open (e.g. parent shard must be closed after a reshard operation).
|
* @param cleanupLeasesOfCompletedShards
|
||||||
* If it is still open, we assume this is a race condition around a reshard event and
|
* @param ignoreUnexpectedChildShards
|
||||||
* throw a KinesisClientLibIOException so client can backoff and retry later. If the shard doesn't exist in
|
|
||||||
* Kinesis at all, we assume this is an old/expired shard and continue with the sync operation.
|
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
* @throws InvalidStateException
|
* @throws InvalidStateException
|
||||||
* @throws ProvisionedThroughputException
|
* @throws ProvisionedThroughputException
|
||||||
|
|
@ -107,18 +112,23 @@ class ShardSyncer {
|
||||||
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
|
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
InitialPositionInStreamExtended initialPosition,
|
InitialPositionInStreamExtended initialPosition,
|
||||||
boolean cleanupLeasesOfCompletedShards)
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
List<Shard> shards = getShardList(kinesisProxy);
|
List<Shard> shards = getShardList(kinesisProxy);
|
||||||
LOG.debug("Num shards: " + shards.size());
|
LOG.debug("Num shards: " + shards.size());
|
||||||
|
|
||||||
Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
|
Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
|
||||||
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
|
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
|
||||||
assertAllParentShardsAreClosed(shardIdToChildShardIdsMap, shardIdToShardMap);
|
Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
|
||||||
|
if (!ignoreUnexpectedChildShards) {
|
||||||
|
assertAllParentShardsAreClosed(inconsistentShardIds);
|
||||||
|
}
|
||||||
|
|
||||||
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
|
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
|
||||||
|
|
||||||
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition);
|
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
|
||||||
|
inconsistentShardIds);
|
||||||
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
|
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
|
||||||
for (KinesisClientLease lease : newLeasesToCreate) {
|
for (KinesisClientLease lease : newLeasesToCreate) {
|
||||||
long startTimeMillis = System.currentTimeMillis();
|
long startTimeMillis = System.currentTimeMillis();
|
||||||
|
|
@ -130,10 +140,10 @@ class ShardSyncer {
|
||||||
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
|
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<KinesisClientLease> trackedLeases = new ArrayList<>();
|
List<KinesisClientLease> trackedLeases = new ArrayList<>();
|
||||||
if (currentLeases != null) {
|
if (currentLeases != null) {
|
||||||
trackedLeases.addAll(currentLeases);
|
trackedLeases.addAll(currentLeases);
|
||||||
}
|
}
|
||||||
trackedLeases.addAll(newLeasesToCreate);
|
trackedLeases.addAll(newLeasesToCreate);
|
||||||
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
|
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
|
||||||
|
|
@ -149,19 +159,39 @@ class ShardSyncer {
|
||||||
|
|
||||||
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
|
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
|
||||||
* and a reshard operation.
|
* and a reshard operation.
|
||||||
* @param shardIdToChildShardIdsMap
|
* @param inconsistentShardIds
|
||||||
* @param shardIdToShardMap
|
|
||||||
* @throws KinesisClientLibIOException
|
* @throws KinesisClientLibIOException
|
||||||
*/
|
*/
|
||||||
private static void assertAllParentShardsAreClosed(Map<String, Set<String>> shardIdToChildShardIdsMap,
|
private static void assertAllParentShardsAreClosed(Set<String> inconsistentShardIds)
|
||||||
Map<String, Shard> shardIdToShardMap) throws KinesisClientLibIOException {
|
throws KinesisClientLibIOException {
|
||||||
|
if (!inconsistentShardIds.isEmpty()) {
|
||||||
|
String ids = "";
|
||||||
|
for (String id : inconsistentShardIds) {
|
||||||
|
ids += " " + id;
|
||||||
|
}
|
||||||
|
throw new KinesisClientLibIOException(String.valueOf(inconsistentShardIds.size()) + " open child shards (" + ids + ") are inconsistent."
|
||||||
|
+ "This can happen due to a race condition between describeStream and a reshard operation.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor
|
||||||
|
* parent(s).
|
||||||
|
* @param shardIdToChildShardIdsMap
|
||||||
|
* @param shardIdToShardMap
|
||||||
|
* @return Set of inconsistent open shard ids for shards having open parents.
|
||||||
|
*/
|
||||||
|
private static Set<String> findInconsistentShardIds(Map<String, Set<String>> shardIdToChildShardIdsMap,
|
||||||
|
Map<String, Shard> shardIdToShardMap) {
|
||||||
|
Set<String> result = new HashSet<String>();
|
||||||
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
|
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
|
||||||
Shard parentShard = shardIdToShardMap.get(parentShardId);
|
Shard parentShard = shardIdToShardMap.get(parentShardId);
|
||||||
if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) {
|
if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) {
|
||||||
throw new KinesisClientLibIOException("Parent shardId " + parentShardId + " is not closed. "
|
Set<String> childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId);
|
||||||
+ "This can happen due to a race condition between describeStream and a reshard operation.");
|
result.addAll(childShardIdsMap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -179,7 +209,7 @@ class ShardSyncer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Note: this has package level access for testing purposes.
|
* Note: this has package level access for testing purposes.
|
||||||
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
|
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
|
||||||
* We verify that if the shard is present in the shard list, it is closed and its hash key range
|
* We verify that if the shard is present in the shard list, it is closed and its hash key range
|
||||||
* is covered by its child shards.
|
* is covered by its child shards.
|
||||||
|
|
@ -190,17 +220,17 @@ class ShardSyncer {
|
||||||
*/
|
*/
|
||||||
static synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> shardIdToShardMap,
|
static synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> shardIdToShardMap,
|
||||||
Map<String, Set<String>> shardIdToChildShardIdsMap,
|
Map<String, Set<String>> shardIdToChildShardIdsMap,
|
||||||
Set<String> shardIdsOfClosedShards) throws KinesisClientLibIOException {
|
Set<String> shardIdsOfClosedShards) throws KinesisClientLibIOException {
|
||||||
String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
|
String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
|
||||||
+ " while a reshard operation was in progress.";
|
+ " while a reshard operation was in progress.";
|
||||||
|
|
||||||
for (String shardId : shardIdsOfClosedShards) {
|
for (String shardId : shardIdsOfClosedShards) {
|
||||||
Shard shard = shardIdToShardMap.get(shardId);
|
Shard shard = shardIdToShardMap.get(shardId);
|
||||||
if (shard == null) {
|
if (shard == null) {
|
||||||
LOG.info("Shard " + shardId + " is not present in Kinesis anymore.");
|
LOG.info("Shard " + shardId + " is not present in Kinesis anymore.");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
|
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
|
||||||
if (endingSequenceNumber == null) {
|
if (endingSequenceNumber == null) {
|
||||||
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards
|
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards
|
||||||
|
|
@ -220,7 +250,7 @@ class ShardSyncer {
|
||||||
private static synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
|
private static synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
|
||||||
Map<String, Shard> shardIdToShardMap,
|
Map<String, Shard> shardIdToShardMap,
|
||||||
Set<String> childShardIds) throws KinesisClientLibIOException {
|
Set<String> childShardIds) throws KinesisClientLibIOException {
|
||||||
|
|
||||||
BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey());
|
BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey());
|
||||||
BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey());
|
BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey());
|
||||||
BigInteger minStartingHashKeyOfChildren = null;
|
BigInteger minStartingHashKeyOfChildren = null;
|
||||||
|
|
@ -239,16 +269,16 @@ class ShardSyncer {
|
||||||
maxEndingHashKeyOfChildren = endingHashKey;
|
maxEndingHashKeyOfChildren = endingHashKey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null)
|
if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null)
|
||||||
|| (minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0)
|
|| (minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0)
|
||||||
|| (maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) {
|
|| (maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) {
|
||||||
throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard "
|
throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard "
|
||||||
+ closedShard.getShardId() + " is not covered by its child shards.");
|
+ closedShard.getShardId() + " is not covered by its child shards.");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method to construct shardId->setOfChildShardIds map.
|
* Helper method to construct shardId->setOfChildShardIds map.
|
||||||
* Note: This has package access for testing purposes only.
|
* Note: This has package access for testing purposes only.
|
||||||
|
|
@ -270,7 +300,7 @@ class ShardSyncer {
|
||||||
}
|
}
|
||||||
childShardIds.add(shardId);
|
childShardIds.add(shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
String adjacentParentShardId = shard.getAdjacentParentShardId();
|
String adjacentParentShardId = shard.getAdjacentParentShardId();
|
||||||
if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) {
|
if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) {
|
||||||
Set<String> childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId);
|
Set<String> childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId);
|
||||||
|
|
@ -296,8 +326,8 @@ class ShardSyncer {
|
||||||
/**
|
/**
|
||||||
* Determine new leases to create and their initial checkpoint.
|
* Determine new leases to create and their initial checkpoint.
|
||||||
* Note: Package level access only for testing purposes.
|
* Note: Package level access only for testing purposes.
|
||||||
*
|
*
|
||||||
* For each open (no ending sequence number) shard that doesn't already have a lease,
|
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
|
||||||
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
|
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
|
||||||
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
|
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
|
||||||
* If not, set checkpoint of the shard to the initial position specified by the client.
|
* If not, set checkpoint of the shard to the initial position specified by the client.
|
||||||
|
|
@ -306,36 +336,43 @@ class ShardSyncer {
|
||||||
* we begin processing data from any of its descendants.
|
* we begin processing data from any of its descendants.
|
||||||
* * A shard does not start processing data until data from all its parents has been processed.
|
* * A shard does not start processing data until data from all its parents has been processed.
|
||||||
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
|
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
|
||||||
* leases corresponding to both the parents - the parent shard which is not a descendant will have
|
* leases corresponding to both the parents - the parent shard which is not a descendant will have
|
||||||
* its checkpoint set to Latest.
|
* its checkpoint set to Latest.
|
||||||
*
|
*
|
||||||
* We assume that if there is an existing lease for a shard, then either:
|
* We assume that if there is an existing lease for a shard, then either:
|
||||||
* * we have previously created a lease for its parent (if it was needed), or
|
* * we have previously created a lease for its parent (if it was needed), or
|
||||||
* * the parent shard has expired.
|
* * the parent shard has expired.
|
||||||
*
|
*
|
||||||
* For example:
|
* For example:
|
||||||
* Shard structure (each level depicts a stream segment):
|
* Shard structure (each level depicts a stream segment):
|
||||||
* 0 1 2 3 4 5- shards till epoch 102
|
* 0 1 2 3 4 5 - shards till epoch 102
|
||||||
* \ / \ / | |
|
* \ / \ / | |
|
||||||
* 6 7 4 5- shards from epoch 103 - 205
|
* 6 7 4 5 - shards from epoch 103 - 205
|
||||||
* \ / | /\
|
* \ / | / \
|
||||||
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
||||||
* Current leases: (3, 4, 5)
|
* Current leases: (3, 4, 5)
|
||||||
* New leases to create: (2, 6, 7, 8, 9, 10)
|
* New leases to create: (2, 6, 7, 8, 9, 10)
|
||||||
*
|
*
|
||||||
* The leases returned are sorted by the starting sequence number - following the same order
|
* The leases returned are sorted by the starting sequence number - following the same order
|
||||||
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
|
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
|
||||||
* before creating all the leases.
|
* before creating all the leases.
|
||||||
*
|
*
|
||||||
|
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
|
||||||
|
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
|
||||||
|
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
|
||||||
|
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
|
||||||
|
*
|
||||||
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
|
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
|
||||||
* @param currentLeases List of current leases
|
* @param currentLeases List of current leases
|
||||||
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
|
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
|
||||||
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
|
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
|
||||||
|
* @param inconsistentShardIds Set of child shard ids having open parents.
|
||||||
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
|
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
|
||||||
*/
|
*/
|
||||||
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
|
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
|
||||||
List<KinesisClientLease> currentLeases,
|
List<KinesisClientLease> currentLeases,
|
||||||
InitialPositionInStreamExtended initialPosition) {
|
InitialPositionInStreamExtended initialPosition,
|
||||||
|
Set<String> inconsistentShardIds) {
|
||||||
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
|
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
|
||||||
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
|
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
|
||||||
|
|
||||||
|
|
@ -354,6 +391,8 @@ class ShardSyncer {
|
||||||
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
|
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
|
||||||
if (shardIdsOfCurrentLeases.contains(shardId)) {
|
if (shardIdsOfCurrentLeases.contains(shardId)) {
|
||||||
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
|
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
|
||||||
|
} else if (inconsistentShardIds.contains(shardId)) {
|
||||||
|
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Need to create a lease for shardId " + shardId);
|
LOG.debug("Need to create a lease for shardId " + shardId);
|
||||||
KinesisClientLease newLease = newKCLLease(shard);
|
KinesisClientLease newLease = newKCLLease(shard);
|
||||||
|
|
@ -407,12 +446,23 @@ class ShardSyncer {
|
||||||
return newLeasesToCreate;
|
return newLeasesToCreate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine new leases to create and their initial checkpoint.
|
||||||
|
* Note: Package level access only for testing purposes.
|
||||||
|
*/
|
||||||
|
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
|
||||||
|
List<KinesisClientLease> currentLeases,
|
||||||
|
InitialPositionInStreamExtended initialPosition) {
|
||||||
|
Set<String> inconsistentShardIds = new HashSet<String>();
|
||||||
|
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Note: Package level access for testing purposes only.
|
* Note: Package level access for testing purposes only.
|
||||||
* Check if this shard is a descendant of a shard that is (or will be) processed.
|
* Check if this shard is a descendant of a shard that is (or will be) processed.
|
||||||
* Create leases for the ancestors of this shard as required.
|
* Create leases for the ancestors of this shard as required.
|
||||||
* See javadoc of determineNewLeasesToCreate() for rules and example.
|
* See javadoc of determineNewLeasesToCreate() for rules and example.
|
||||||
*
|
*
|
||||||
* @param shardId The shardId to check.
|
* @param shardId The shardId to check.
|
||||||
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
|
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
|
||||||
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
|
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
|
||||||
|
|
@ -429,7 +479,7 @@ class ShardSyncer {
|
||||||
Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
|
Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
|
||||||
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards,
|
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards,
|
||||||
Map<String, Boolean> memoizationContext) {
|
Map<String, Boolean> memoizationContext) {
|
||||||
|
|
||||||
Boolean previousValue = memoizationContext.get(shardId);
|
Boolean previousValue = memoizationContext.get(shardId);
|
||||||
if (previousValue != null) {
|
if (previousValue != null) {
|
||||||
return previousValue;
|
return previousValue;
|
||||||
|
|
@ -509,7 +559,7 @@ class ShardSyncer {
|
||||||
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
|
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
|
||||||
* a/ they are not null
|
* a/ they are not null
|
||||||
* b/ if they exist in the current shard map (i.e. haven't expired)
|
* b/ if they exist in the current shard map (i.e. haven't expired)
|
||||||
*
|
*
|
||||||
* @param shard Will return parents of this shard
|
* @param shard Will return parents of this shard
|
||||||
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
|
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
|
||||||
* @return Set of parentShardIds
|
* @return Set of parentShardIds
|
||||||
|
|
@ -528,18 +578,18 @@ class ShardSyncer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete leases corresponding to shards that no longer exist in the stream.
|
* Delete leases corresponding to shards that no longer exist in the stream.
|
||||||
* Current scheme: Delete a lease if:
|
* Current scheme: Delete a lease if:
|
||||||
* * the corresponding shard is not present in the list of Kinesis shards, AND
|
* * the corresponding shard is not present in the list of Kinesis shards, AND
|
||||||
* * the parentShardIds listed in the lease are also not present in the list of Kinesis shards.
|
* * the parentShardIds listed in the lease are also not present in the list of Kinesis shards.
|
||||||
* @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
|
* @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
|
||||||
* @param trackedLeases List of
|
* @param trackedLeases List of
|
||||||
* @param kinesisProxy Kinesis proxy (used to get shard list)
|
* @param kinesisProxy Kinesis proxy (used to get shard list)
|
||||||
* @param leaseManager
|
* @param leaseManager
|
||||||
* @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis.
|
* @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis.
|
||||||
* @throws ProvisionedThroughputException
|
* @throws ProvisionedThroughputException
|
||||||
* @throws InvalidStateException
|
* @throws InvalidStateException
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
*/
|
*/
|
||||||
private static void cleanupGarbageLeases(List<Shard> shards,
|
private static void cleanupGarbageLeases(List<Shard> shards,
|
||||||
List<KinesisClientLease> trackedLeases,
|
List<KinesisClientLease> trackedLeases,
|
||||||
|
|
@ -550,7 +600,7 @@ class ShardSyncer {
|
||||||
for (Shard shard : shards) {
|
for (Shard shard : shards) {
|
||||||
kinesisShards.add(shard.getShardId());
|
kinesisShards.add(shard.getShardId());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if there are leases for non-existent shards
|
// Check if there are leases for non-existent shards
|
||||||
List<KinesisClientLease> garbageLeases = new ArrayList<>();
|
List<KinesisClientLease> garbageLeases = new ArrayList<>();
|
||||||
for (KinesisClientLease lease : trackedLeases) {
|
for (KinesisClientLease lease : trackedLeases) {
|
||||||
|
|
@ -558,10 +608,10 @@ class ShardSyncer {
|
||||||
garbageLeases.add(lease);
|
garbageLeases.add(lease);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!garbageLeases.isEmpty()) {
|
if (!garbageLeases.isEmpty()) {
|
||||||
LOG.info("Found " + garbageLeases.size()
|
LOG.info("Found " + garbageLeases.size()
|
||||||
+ " candidate leases for cleanup. Refreshing list of"
|
+ " candidate leases for cleanup. Refreshing list of"
|
||||||
+ " Kinesis shards to pick up recent/latest shards");
|
+ " Kinesis shards to pick up recent/latest shards");
|
||||||
List<Shard> currentShardList = getShardList(kinesisProxy);
|
List<Shard> currentShardList = getShardList(kinesisProxy);
|
||||||
Set<String> currentKinesisShardIds = new HashSet<>();
|
Set<String> currentKinesisShardIds = new HashSet<>();
|
||||||
|
|
@ -577,12 +627,12 @@ class ShardSyncer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Note: This method has package level access, solely for testing purposes.
|
* Note: This method has package level access, solely for testing purposes.
|
||||||
*
|
*
|
||||||
* @param lease Candidate shard we are considering for deletion.
|
* @param lease Candidate shard we are considering for deletion.
|
||||||
* @param currentKinesisShardIds
|
* @param currentKinesisShardIds
|
||||||
* @return true if neither the shard (corresponding to the lease), nor its parents are present in
|
* @return true if neither the shard (corresponding to the lease), nor its parents are present in
|
||||||
|
|
@ -593,16 +643,16 @@ class ShardSyncer {
|
||||||
static boolean isCandidateForCleanup(KinesisClientLease lease, Set<String> currentKinesisShardIds)
|
static boolean isCandidateForCleanup(KinesisClientLease lease, Set<String> currentKinesisShardIds)
|
||||||
throws KinesisClientLibIOException {
|
throws KinesisClientLibIOException {
|
||||||
boolean isCandidateForCleanup = true;
|
boolean isCandidateForCleanup = true;
|
||||||
|
|
||||||
if (currentKinesisShardIds.contains(lease.getLeaseKey())) {
|
if (currentKinesisShardIds.contains(lease.getLeaseKey())) {
|
||||||
isCandidateForCleanup = false;
|
isCandidateForCleanup = false;
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Found lease for non-existent shard: " + lease.getLeaseKey() + ". Checking its parent shards");
|
LOG.info("Found lease for non-existent shard: " + lease.getLeaseKey() + ". Checking its parent shards");
|
||||||
Set<String> parentShardIds = lease.getParentShardIds();
|
Set<String> parentShardIds = lease.getParentShardIds();
|
||||||
for (String parentShardId : parentShardIds) {
|
for (String parentShardId : parentShardIds) {
|
||||||
|
|
||||||
// Throw an exception if the parent shard exists (but the child does not).
|
// Throw an exception if the parent shard exists (but the child does not).
|
||||||
// This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
|
// This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
|
||||||
if (currentKinesisShardIds.contains(parentShardId)) {
|
if (currentKinesisShardIds.contains(parentShardId)) {
|
||||||
String message =
|
String message =
|
||||||
"Parent shard " + parentShardId + " exists but not the child shard "
|
"Parent shard " + parentShardId + " exists but not the child shard "
|
||||||
|
|
@ -615,14 +665,14 @@ class ShardSyncer {
|
||||||
|
|
||||||
return isCandidateForCleanup;
|
return isCandidateForCleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Private helper method.
|
* Private helper method.
|
||||||
* Clean up leases for shards that meet the following criteria:
|
* Clean up leases for shards that meet the following criteria:
|
||||||
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
|
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
|
||||||
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
|
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
|
||||||
* TRIM_HORIZON.
|
* TRIM_HORIZON.
|
||||||
*
|
*
|
||||||
* @param currentLeases List of leases we evaluate for clean up
|
* @param currentLeases List of leases we evaluate for clean up
|
||||||
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
|
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
|
||||||
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
|
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
|
||||||
|
|
@ -664,22 +714,22 @@ class ShardSyncer {
|
||||||
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete lease for the closed shard. Rules for deletion are:
|
* Delete lease for the closed shard. Rules for deletion are:
|
||||||
* a/ the checkpoint for the closed shard is SHARD_END,
|
* a/ the checkpoint for the closed shard is SHARD_END,
|
||||||
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
|
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
|
||||||
* Note: This method has package level access solely for testing purposes.
|
* Note: This method has package level access solely for testing purposes.
|
||||||
*
|
*
|
||||||
* @param closedShardId Identifies the closed shard
|
* @param closedShardId Identifies the closed shard
|
||||||
* @param childShardIds ShardIds of children of the closed shard
|
* @param childShardIds ShardIds of children of the closed shard
|
||||||
* @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null)
|
* @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null)
|
||||||
* @param leaseManager
|
* @param leaseManager
|
||||||
* @throws ProvisionedThroughputException
|
* @throws ProvisionedThroughputException
|
||||||
* @throws InvalidStateException
|
* @throws InvalidStateException
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
*/
|
*/
|
||||||
static synchronized void cleanupLeaseForClosedShard(String closedShardId,
|
static synchronized void cleanupLeaseForClosedShard(String closedShardId,
|
||||||
Set<String> childShardIds,
|
Set<String> childShardIds,
|
||||||
|
|
@ -688,14 +738,14 @@ class ShardSyncer {
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
|
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
|
||||||
List<KinesisClientLease> childShardLeases = new ArrayList<>();
|
List<KinesisClientLease> childShardLeases = new ArrayList<>();
|
||||||
|
|
||||||
for (String childShardId : childShardIds) {
|
for (String childShardId : childShardIds) {
|
||||||
KinesisClientLease childLease = trackedLeases.get(childShardId);
|
KinesisClientLease childLease = trackedLeases.get(childShardId);
|
||||||
if (childLease != null) {
|
if (childLease != null) {
|
||||||
childShardLeases.add(childLease);
|
childShardLeases.add(childLease);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((leaseForClosedShard != null)
|
if ((leaseForClosedShard != null)
|
||||||
&& (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END))
|
&& (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END))
|
||||||
&& (childShardLeases.size() == childShardIds.size())) {
|
&& (childShardLeases.size() == childShardIds.size())) {
|
||||||
|
|
@ -706,7 +756,7 @@ class ShardSyncer {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (okayToDelete) {
|
if (okayToDelete) {
|
||||||
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
|
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
|
||||||
+ " as it has been completely processed and processing of child shards has begun.");
|
+ " as it has been completely processed and processing of child shards has begun.");
|
||||||
|
|
@ -718,7 +768,7 @@ class ShardSyncer {
|
||||||
/**
|
/**
|
||||||
* Helper method to create a new KinesisClientLease POJO for a shard.
|
* Helper method to create a new KinesisClientLease POJO for a shard.
|
||||||
* Note: Package level access only for testing purposes
|
* Note: Package level access only for testing purposes
|
||||||
*
|
*
|
||||||
* @param shard
|
* @param shard
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
|
|
@ -740,7 +790,7 @@ class ShardSyncer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method to construct a shardId->Shard map for the specified list of shards.
|
* Helper method to construct a shardId->Shard map for the specified list of shards.
|
||||||
*
|
*
|
||||||
* @param shards List of shards
|
* @param shards List of shards
|
||||||
* @return ShardId->Shard map
|
* @return ShardId->Shard map
|
||||||
*/
|
*/
|
||||||
|
|
@ -755,7 +805,7 @@ class ShardSyncer {
|
||||||
/**
|
/**
|
||||||
* Helper method to return all the open shards for a stream.
|
* Helper method to return all the open shards for a stream.
|
||||||
* Note: Package level access only for testing purposes.
|
* Note: Package level access only for testing purposes.
|
||||||
*
|
*
|
||||||
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
|
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
|
||||||
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
|
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
|
||||||
*/
|
*/
|
||||||
|
|
@ -773,7 +823,7 @@ class ShardSyncer {
|
||||||
|
|
||||||
private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
|
private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
|
||||||
ExtendedSequenceNumber checkpoint = null;
|
ExtendedSequenceNumber checkpoint = null;
|
||||||
|
|
||||||
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
|
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
|
||||||
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
|
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
|
||||||
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
|
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
|
||||||
|
|
@ -781,10 +831,10 @@ class ShardSyncer {
|
||||||
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
|
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
|
||||||
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
|
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
|
||||||
}
|
}
|
||||||
|
|
||||||
return checkpoint;
|
return checkpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
|
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
@ -794,7 +844,7 @@ class ShardSyncer {
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
private final Map<String, Shard> shardIdToShardMap;
|
private final Map<String, Shard> shardIdToShardMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param shardIdToShardMapOfAllKinesisShards
|
* @param shardIdToShardMapOfAllKinesisShards
|
||||||
*/
|
*/
|
||||||
|
|
@ -808,7 +858,7 @@ class ShardSyncer {
|
||||||
* We assume that lease1 and lease2 are:
|
* We assume that lease1 and lease2 are:
|
||||||
* a/ not null,
|
* a/ not null,
|
||||||
* b/ shards (if found) have non-null starting sequence numbers
|
* b/ shards (if found) have non-null starting sequence numbers
|
||||||
*
|
*
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -818,23 +868,23 @@ class ShardSyncer {
|
||||||
String shardId2 = lease2.getLeaseKey();
|
String shardId2 = lease2.getLeaseKey();
|
||||||
Shard shard1 = shardIdToShardMap.get(shardId1);
|
Shard shard1 = shardIdToShardMap.get(shardId1);
|
||||||
Shard shard2 = shardIdToShardMap.get(shardId2);
|
Shard shard2 = shardIdToShardMap.get(shardId2);
|
||||||
|
|
||||||
// If we found shards for the two leases, use comparison of the starting sequence numbers
|
// If we found shards for the two leases, use comparison of the starting sequence numbers
|
||||||
if ((shard1 != null) && (shard2 != null)) {
|
if ((shard1 != null) && (shard2 != null)) {
|
||||||
BigInteger sequenceNumber1 =
|
BigInteger sequenceNumber1 =
|
||||||
new BigInteger(shard1.getSequenceNumberRange().getStartingSequenceNumber());
|
new BigInteger(shard1.getSequenceNumberRange().getStartingSequenceNumber());
|
||||||
BigInteger sequenceNumber2 =
|
BigInteger sequenceNumber2 =
|
||||||
new BigInteger(shard2.getSequenceNumberRange().getStartingSequenceNumber());
|
new BigInteger(shard2.getSequenceNumberRange().getStartingSequenceNumber());
|
||||||
result = sequenceNumber1.compareTo(sequenceNumber2);
|
result = sequenceNumber1.compareTo(sequenceNumber2);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result == 0) {
|
if (result == 0) {
|
||||||
result = shardId1.compareTo(shardId2);
|
result = shardId1.compareTo(shardId2);
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ class ShutdownTask implements ITask {
|
||||||
private final ILeaseManager<KinesisClientLease> leaseManager;
|
private final ILeaseManager<KinesisClientLease> leaseManager;
|
||||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||||
private final boolean cleanupLeasesOfCompletedShards;
|
private final boolean cleanupLeasesOfCompletedShards;
|
||||||
|
private final boolean ignoreUnexpectedChildShards;
|
||||||
private final TaskType taskType = TaskType.SHUTDOWN;
|
private final TaskType taskType = TaskType.SHUTDOWN;
|
||||||
private final long backoffTimeMillis;
|
private final long backoffTimeMillis;
|
||||||
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||||
|
|
@ -59,8 +60,9 @@ class ShutdownTask implements ITask {
|
||||||
IKinesisProxy kinesisProxy,
|
IKinesisProxy kinesisProxy,
|
||||||
InitialPositionInStreamExtended initialPositionInStream,
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
boolean cleanupLeasesOfCompletedShards,
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards,
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
long backoffTimeMillis,
|
long backoffTimeMillis,
|
||||||
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
||||||
this.shardInfo = shardInfo;
|
this.shardInfo = shardInfo;
|
||||||
this.recordProcessor = recordProcessor;
|
this.recordProcessor = recordProcessor;
|
||||||
|
|
@ -69,6 +71,7 @@ class ShutdownTask implements ITask {
|
||||||
this.kinesisProxy = kinesisProxy;
|
this.kinesisProxy = kinesisProxy;
|
||||||
this.initialPositionInStream = initialPositionInStream;
|
this.initialPositionInStream = initialPositionInStream;
|
||||||
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.leaseManager = leaseManager;
|
this.leaseManager = leaseManager;
|
||||||
this.backoffTimeMillis = backoffTimeMillis;
|
this.backoffTimeMillis = backoffTimeMillis;
|
||||||
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
||||||
|
|
@ -77,7 +80,7 @@ class ShutdownTask implements ITask {
|
||||||
/*
|
/*
|
||||||
* Invokes RecordProcessor shutdown() API.
|
* Invokes RecordProcessor shutdown() API.
|
||||||
* (non-Javadoc)
|
* (non-Javadoc)
|
||||||
*
|
*
|
||||||
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
|
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -127,7 +130,8 @@ class ShutdownTask implements ITask {
|
||||||
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
||||||
leaseManager,
|
leaseManager,
|
||||||
initialPositionInStream,
|
initialPositionInStream,
|
||||||
cleanupLeasesOfCompletedShards);
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards);
|
||||||
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
|
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -152,7 +156,7 @@ class ShutdownTask implements ITask {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (non-Javadoc)
|
* (non-Javadoc)
|
||||||
*
|
*
|
||||||
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType()
|
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType()
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -103,6 +103,7 @@ public class Worker implements Runnable {
|
||||||
// info, value is ShardConsumer.
|
// info, value is ShardConsumer.
|
||||||
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
|
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
|
||||||
private final boolean cleanupLeasesUponShardCompletion;
|
private final boolean cleanupLeasesUponShardCompletion;
|
||||||
|
private final boolean ignoreUnexpectedChildShards;
|
||||||
|
|
||||||
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
|
|
||||||
|
|
@ -253,7 +254,8 @@ public class Worker implements Runnable {
|
||||||
config.shouldValidateSequenceNumberBeforeCheckpointing(),
|
config.shouldValidateSequenceNumberBeforeCheckpointing(),
|
||||||
config.getInitialPositionInStreamExtended()),
|
config.getInitialPositionInStreamExtended()),
|
||||||
config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(),
|
config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(),
|
||||||
config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
|
config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(),
|
||||||
|
config.shouldIgnoreUnexpectedChildShards(), null,
|
||||||
new KinesisClientLibLeaseCoordinator(
|
new KinesisClientLibLeaseCoordinator(
|
||||||
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
|
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
|
||||||
config.getWorkerIdentifier(),
|
config.getWorkerIdentifier(),
|
||||||
|
|
@ -318,6 +320,8 @@ public class Worker implements Runnable {
|
||||||
* Time between tasks to sync leases and Kinesis shards
|
* Time between tasks to sync leases and Kinesis shards
|
||||||
* @param cleanupLeasesUponShardCompletion
|
* @param cleanupLeasesUponShardCompletion
|
||||||
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
|
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
|
||||||
|
* @param ignoreUnexpectedChildShards
|
||||||
|
* Ignore child shards with open parents
|
||||||
* @param checkpoint
|
* @param checkpoint
|
||||||
* Used to get/set checkpoints
|
* Used to get/set checkpoints
|
||||||
* @param leaseCoordinator
|
* @param leaseCoordinator
|
||||||
|
|
@ -335,14 +339,14 @@ public class Worker implements Runnable {
|
||||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
||||||
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
|
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
|
||||||
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
|
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
|
||||||
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
|
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards,
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
|
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
|
||||||
this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
|
this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
|
||||||
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, checkpoint,
|
||||||
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
|
||||||
shardPrioritization, Optional.empty(), Optional.empty());
|
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -363,6 +367,8 @@ public class Worker implements Runnable {
|
||||||
* Time between tasks to sync leases and Kinesis shards
|
* Time between tasks to sync leases and Kinesis shards
|
||||||
* @param cleanupLeasesUponShardCompletion
|
* @param cleanupLeasesUponShardCompletion
|
||||||
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
|
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
|
||||||
|
* @param ignoreUnexpectedChildShards
|
||||||
|
* Ignore child shards with open parents
|
||||||
* @param checkpoint
|
* @param checkpoint
|
||||||
* Used to get/set checkpoints
|
* Used to get/set checkpoints
|
||||||
* @param leaseCoordinator
|
* @param leaseCoordinator
|
||||||
|
|
@ -384,8 +390,8 @@ public class Worker implements Runnable {
|
||||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
||||||
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
|
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
|
||||||
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
|
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
|
||||||
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
|
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards,
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
||||||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
|
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
|
||||||
|
|
@ -395,14 +401,15 @@ public class Worker implements Runnable {
|
||||||
this.initialPosition = initialPositionInStream;
|
this.initialPosition = initialPositionInStream;
|
||||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
|
this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
|
||||||
this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
|
this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
|
||||||
this.executorService = execService;
|
this.executorService = execService;
|
||||||
this.leaseCoordinator = leaseCoordinator;
|
this.leaseCoordinator = leaseCoordinator;
|
||||||
this.metricsFactory = metricsFactory;
|
this.metricsFactory = metricsFactory;
|
||||||
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
||||||
initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
|
initialPositionInStream, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
|
||||||
executorService);
|
metricsFactory, executorService);
|
||||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||||
this.failoverTimeMillis = failoverTimeMillis;
|
this.failoverTimeMillis = failoverTimeMillis;
|
||||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
|
|
@ -494,7 +501,8 @@ public class Worker implements Runnable {
|
||||||
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
|
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
|
||||||
LOG.info("Syncing Kinesis shard info");
|
LOG.info("Syncing Kinesis shard info");
|
||||||
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
|
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
|
||||||
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L);
|
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards, 0L);
|
||||||
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
|
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
|
||||||
|
|
@ -792,7 +800,7 @@ public class Worker implements Runnable {
|
||||||
/**
|
/**
|
||||||
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
|
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
|
||||||
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
||||||
*
|
*
|
||||||
* @return Whether worker should shutdown immediately.
|
* @return Whether worker should shutdown immediately.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
@ -844,7 +852,7 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
|
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
|
||||||
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
|
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
|
||||||
executorService, metricsFactory, taskBackoffTimeMillis,
|
ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis,
|
||||||
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool);
|
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -1164,10 +1172,10 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides logic how to prioritize shard processing.
|
* Provides logic how to prioritize shard processing.
|
||||||
*
|
*
|
||||||
* @param shardPrioritization
|
* @param shardPrioritization
|
||||||
* shardPrioritization is responsible to order shards before processing
|
* shardPrioritization is responsible to order shards before processing
|
||||||
*
|
*
|
||||||
* @return A reference to this updated object so that method calls can be chained together.
|
* @return A reference to this updated object so that method calls can be chained together.
|
||||||
*/
|
*/
|
||||||
public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
|
public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
|
||||||
|
|
@ -1255,6 +1263,7 @@ public class Worker implements Runnable {
|
||||||
config.getParentShardPollIntervalMillis(),
|
config.getParentShardPollIntervalMillis(),
|
||||||
config.getShardSyncIntervalMillis(),
|
config.getShardSyncIntervalMillis(),
|
||||||
config.shouldCleanupLeasesUponShardCompletion(),
|
config.shouldCleanupLeasesUponShardCompletion(),
|
||||||
|
config.shouldIgnoreUnexpectedChildShards(),
|
||||||
null,
|
null,
|
||||||
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(),
|
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(),
|
||||||
dynamoDBClient),
|
dynamoDBClient),
|
||||||
|
|
|
||||||
|
|
@ -77,6 +77,7 @@ public class KinesisClientLibConfigurationTest {
|
||||||
TEST_VALUE_LONG,
|
TEST_VALUE_LONG,
|
||||||
TEST_VALUE_LONG,
|
TEST_VALUE_LONG,
|
||||||
true,
|
true,
|
||||||
|
false,
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
|
|
@ -116,6 +117,7 @@ public class KinesisClientLibConfigurationTest {
|
||||||
longValues[2],
|
longValues[2],
|
||||||
longValues[3],
|
longValues[3],
|
||||||
true,
|
true,
|
||||||
|
false,
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
|
|
@ -151,6 +153,7 @@ public class KinesisClientLibConfigurationTest {
|
||||||
TEST_VALUE_LONG,
|
TEST_VALUE_LONG,
|
||||||
TEST_VALUE_LONG,
|
TEST_VALUE_LONG,
|
||||||
true,
|
true,
|
||||||
|
false,
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
|
|
@ -315,6 +318,7 @@ public class KinesisClientLibConfigurationTest {
|
||||||
TEST_VALUE_LONG,
|
TEST_VALUE_LONG,
|
||||||
TEST_VALUE_LONG,
|
TEST_VALUE_LONG,
|
||||||
true,
|
true,
|
||||||
|
false,
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
new ClientConfiguration(),
|
new ClientConfiguration(),
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,7 @@ public class ShardConsumerTest {
|
||||||
private final long taskBackoffTimeMillis = 500L;
|
private final long taskBackoffTimeMillis = 500L;
|
||||||
private final long parentShardPollIntervalMillis = 50L;
|
private final long parentShardPollIntervalMillis = 50L;
|
||||||
private final boolean cleanupLeasesOfCompletedShards = true;
|
private final boolean cleanupLeasesOfCompletedShards = true;
|
||||||
|
private final boolean ignoreUnexpectedChildShards = false;
|
||||||
// We don't want any of these tests to run checkpoint validation
|
// We don't want any of these tests to run checkpoint validation
|
||||||
private final boolean skipCheckpointValidationValue = false;
|
private final boolean skipCheckpointValidationValue = false;
|
||||||
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
|
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
|
||||||
|
|
@ -134,6 +135,7 @@ public class ShardConsumerTest {
|
||||||
null,
|
null,
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
executorService,
|
executorService,
|
||||||
metricsFactory,
|
metricsFactory,
|
||||||
taskBackoffTimeMillis,
|
taskBackoffTimeMillis,
|
||||||
|
|
@ -182,6 +184,7 @@ public class ShardConsumerTest {
|
||||||
null,
|
null,
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
spyExecutorService,
|
spyExecutorService,
|
||||||
metricsFactory,
|
metricsFactory,
|
||||||
taskBackoffTimeMillis,
|
taskBackoffTimeMillis,
|
||||||
|
|
@ -223,6 +226,7 @@ public class ShardConsumerTest {
|
||||||
null,
|
null,
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
executorService,
|
executorService,
|
||||||
metricsFactory,
|
metricsFactory,
|
||||||
taskBackoffTimeMillis,
|
taskBackoffTimeMillis,
|
||||||
|
|
@ -318,6 +322,7 @@ public class ShardConsumerTest {
|
||||||
leaseManager,
|
leaseManager,
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
executorService,
|
executorService,
|
||||||
metricsFactory,
|
metricsFactory,
|
||||||
taskBackoffTimeMillis,
|
taskBackoffTimeMillis,
|
||||||
|
|
@ -421,6 +426,7 @@ public class ShardConsumerTest {
|
||||||
leaseManager,
|
leaseManager,
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
executorService,
|
executorService,
|
||||||
metricsFactory,
|
metricsFactory,
|
||||||
taskBackoffTimeMillis,
|
taskBackoffTimeMillis,
|
||||||
|
|
@ -483,6 +489,7 @@ public class ShardConsumerTest {
|
||||||
null,
|
null,
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
executorService,
|
executorService,
|
||||||
metricsFactory,
|
metricsFactory,
|
||||||
taskBackoffTimeMillis,
|
taskBackoffTimeMillis,
|
||||||
|
|
@ -530,6 +537,7 @@ public class ShardConsumerTest {
|
||||||
null,
|
null,
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
executorService,
|
executorService,
|
||||||
metricsFactory,
|
metricsFactory,
|
||||||
taskBackoffTimeMillis,
|
taskBackoffTimeMillis,
|
||||||
|
|
@ -558,6 +566,7 @@ public class ShardConsumerTest {
|
||||||
null,
|
null,
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
executorService,
|
executorService,
|
||||||
metricsFactory,
|
metricsFactory,
|
||||||
taskBackoffTimeMillis,
|
taskBackoffTimeMillis,
|
||||||
|
|
|
||||||
|
|
@ -105,7 +105,7 @@ public class ShardSyncTaskIntegrationTest {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test method for call().
|
* Test method for call().
|
||||||
*
|
*
|
||||||
* @throws CapacityExceededException
|
* @throws CapacityExceededException
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
* @throws InvalidStateException
|
* @throws InvalidStateException
|
||||||
|
|
@ -123,7 +123,7 @@ public class ShardSyncTaskIntegrationTest {
|
||||||
ShardSyncTask syncTask = new ShardSyncTask(kinesisProxy,
|
ShardSyncTask syncTask = new ShardSyncTask(kinesisProxy,
|
||||||
leaseManager,
|
leaseManager,
|
||||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
|
||||||
false,
|
false, false,
|
||||||
0L);
|
0L);
|
||||||
syncTask.call();
|
syncTask.call();
|
||||||
List<KinesisClientLease> leases = leaseManager.listLeases();
|
List<KinesisClientLease> leases = leaseManager.listLeases();
|
||||||
|
|
|
||||||
|
|
@ -100,6 +100,7 @@ public class ShutdownTaskTest {
|
||||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||||
boolean cleanupLeasesOfCompletedShards = false;
|
boolean cleanupLeasesOfCompletedShards = false;
|
||||||
|
boolean ignoreUnexpectedChildShards = false;
|
||||||
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||||
defaultRecordProcessor,
|
defaultRecordProcessor,
|
||||||
checkpointer,
|
checkpointer,
|
||||||
|
|
@ -107,6 +108,7 @@ public class ShutdownTaskTest {
|
||||||
kinesisProxy,
|
kinesisProxy,
|
||||||
INITIAL_POSITION_TRIM_HORIZON,
|
INITIAL_POSITION_TRIM_HORIZON,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
leaseManager,
|
leaseManager,
|
||||||
TASK_BACKOFF_TIME_MILLIS,
|
TASK_BACKOFF_TIME_MILLIS,
|
||||||
getRecordsRetrievalStrategy);
|
getRecordsRetrievalStrategy);
|
||||||
|
|
@ -126,6 +128,7 @@ public class ShutdownTaskTest {
|
||||||
when(kinesisProxy.getShardList()).thenReturn(null);
|
when(kinesisProxy.getShardList()).thenReturn(null);
|
||||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||||
boolean cleanupLeasesOfCompletedShards = false;
|
boolean cleanupLeasesOfCompletedShards = false;
|
||||||
|
boolean ignoreUnexpectedChildShards = false;
|
||||||
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||||
defaultRecordProcessor,
|
defaultRecordProcessor,
|
||||||
checkpointer,
|
checkpointer,
|
||||||
|
|
@ -133,6 +136,7 @@ public class ShutdownTaskTest {
|
||||||
kinesisProxy,
|
kinesisProxy,
|
||||||
INITIAL_POSITION_TRIM_HORIZON,
|
INITIAL_POSITION_TRIM_HORIZON,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
leaseManager,
|
leaseManager,
|
||||||
TASK_BACKOFF_TIME_MILLIS,
|
TASK_BACKOFF_TIME_MILLIS,
|
||||||
getRecordsRetrievalStrategy);
|
getRecordsRetrievalStrategy);
|
||||||
|
|
@ -147,7 +151,7 @@ public class ShutdownTaskTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testGetTaskType() {
|
public final void testGetTaskType() {
|
||||||
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy);
|
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsRetrievalStrategy);
|
||||||
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
|
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -119,6 +119,7 @@ public class WorkerTest {
|
||||||
private final long parentShardPollIntervalMillis = 5L;
|
private final long parentShardPollIntervalMillis = 5L;
|
||||||
private final long shardSyncIntervalMillis = 5L;
|
private final long shardSyncIntervalMillis = 5L;
|
||||||
private final boolean cleanupLeasesUponShardCompletion = true;
|
private final boolean cleanupLeasesUponShardCompletion = true;
|
||||||
|
private final boolean ignoreUnexpectedChildShards = false;
|
||||||
// We don't want any of these tests to run checkpoint validation
|
// We don't want any of these tests to run checkpoint validation
|
||||||
private final boolean skipCheckpointValidationValue = false;
|
private final boolean skipCheckpointValidationValue = false;
|
||||||
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
|
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
|
||||||
|
|
@ -156,7 +157,7 @@ public class WorkerTest {
|
||||||
private TaskResult taskResult;
|
private TaskResult taskResult;
|
||||||
|
|
||||||
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
|
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
|
||||||
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
|
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
|
||||||
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
|
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -189,8 +190,8 @@ public class WorkerTest {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
|
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
|
||||||
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
|
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -232,6 +233,7 @@ public class WorkerTest {
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
shardSyncIntervalMillis,
|
shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
checkpoint,
|
checkpoint,
|
||||||
leaseCoordinator,
|
leaseCoordinator,
|
||||||
execService,
|
execService,
|
||||||
|
|
@ -276,8 +278,9 @@ public class WorkerTest {
|
||||||
.thenReturn(secondCheckpoint);
|
.thenReturn(secondCheckpoint);
|
||||||
|
|
||||||
Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
|
Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
|
||||||
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint,
|
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion,
|
||||||
leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
|
ignoreUnexpectedChildShards, checkpoint, leaseCoordinator, execService, nullMetricsFactory,
|
||||||
|
taskBackoffTimeMillis, failoverTimeMillis,
|
||||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization);
|
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization);
|
||||||
|
|
||||||
Worker workerSpy = spy(worker);
|
Worker workerSpy = spy(worker);
|
||||||
|
|
@ -336,6 +339,7 @@ public class WorkerTest {
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
shardSyncIntervalMillis,
|
shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
checkpoint,
|
checkpoint,
|
||||||
leaseCoordinator,
|
leaseCoordinator,
|
||||||
execService,
|
execService,
|
||||||
|
|
@ -390,6 +394,7 @@ public class WorkerTest {
|
||||||
shardPollInterval,
|
shardPollInterval,
|
||||||
shardSyncIntervalMillis,
|
shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
leaseCoordinator,
|
leaseCoordinator,
|
||||||
leaseCoordinator,
|
leaseCoordinator,
|
||||||
execService,
|
execService,
|
||||||
|
|
@ -603,7 +608,7 @@ public class WorkerTest {
|
||||||
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
|
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
|
||||||
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
|
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
|
||||||
* This behavior makes the test a bit racy, since we need to ensure a specific order of events.
|
* This behavior makes the test a bit racy, since we need to ensure a specific order of events.
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -744,8 +749,9 @@ public class WorkerTest {
|
||||||
|
|
||||||
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
|
||||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
|
executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
|
||||||
|
shardPrioritization);
|
||||||
|
|
||||||
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||||
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||||
|
|
@ -818,8 +824,8 @@ public class WorkerTest {
|
||||||
|
|
||||||
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
|
||||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
|
executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
|
||||||
@Override
|
@Override
|
||||||
void postConstruct() {
|
void postConstruct() {
|
||||||
this.gracefuleShutdownStarted = true;
|
this.gracefuleShutdownStarted = true;
|
||||||
|
|
@ -890,8 +896,8 @@ public class WorkerTest {
|
||||||
|
|
||||||
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
|
||||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
|
executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
|
||||||
@Override
|
@Override
|
||||||
void postConstruct() {
|
void postConstruct() {
|
||||||
this.gracefulShutdownCoordinator = coordinator;
|
this.gracefulShutdownCoordinator = coordinator;
|
||||||
|
|
@ -952,8 +958,9 @@ public class WorkerTest {
|
||||||
|
|
||||||
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
|
||||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
|
executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
|
||||||
|
shardPrioritization);
|
||||||
|
|
||||||
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||||
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||||
|
|
@ -1022,8 +1029,9 @@ public class WorkerTest {
|
||||||
|
|
||||||
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
|
||||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
|
executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
|
||||||
|
shardPrioritization);
|
||||||
|
|
||||||
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||||
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||||
|
|
@ -1123,8 +1131,9 @@ public class WorkerTest {
|
||||||
|
|
||||||
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
|
||||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
|
executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
|
||||||
|
shardPrioritization);
|
||||||
|
|
||||||
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||||
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||||
|
|
@ -1228,8 +1237,9 @@ public class WorkerTest {
|
||||||
|
|
||||||
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
|
||||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
|
executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
|
||||||
|
shardPrioritization);
|
||||||
|
|
||||||
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||||
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||||
|
|
@ -1300,8 +1310,9 @@ public class WorkerTest {
|
||||||
|
|
||||||
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
|
||||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
|
executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false,
|
||||||
|
shardPrioritization);
|
||||||
|
|
||||||
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||||
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||||
|
|
@ -1338,14 +1349,15 @@ public class WorkerTest {
|
||||||
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
|
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
|
||||||
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
|
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
|
||||||
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
|
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
|
||||||
boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
|
boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, ICheckpoint checkpoint,
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
|
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
|
||||||
super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream,
|
super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream,
|
||||||
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion,
|
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion,
|
||||||
checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis,
|
ignoreUnexpectedChildShards, checkpoint, leaseCoordinator, execService, metricsFactory,
|
||||||
failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization);
|
taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||||
|
shardPrioritization);
|
||||||
postConstruct();
|
postConstruct();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1657,6 +1669,7 @@ public class WorkerTest {
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
shardSyncIntervalMillis,
|
shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
leaseCoordinator,
|
leaseCoordinator,
|
||||||
leaseCoordinator,
|
leaseCoordinator,
|
||||||
executorService,
|
executorService,
|
||||||
|
|
@ -1665,7 +1678,7 @@ public class WorkerTest {
|
||||||
failoverTimeMillis,
|
failoverTimeMillis,
|
||||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
||||||
shardPrioritization);
|
shardPrioritization);
|
||||||
|
|
||||||
WorkerThread workerThread = new WorkerThread(worker);
|
WorkerThread workerThread = new WorkerThread(worker);
|
||||||
workerThread.start();
|
workerThread.start();
|
||||||
return workerThread;
|
return workerThread;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue