diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index c0bdc060..030d3801 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -19,7 +19,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; * and state transitions is contained within the {@link ConsumerState} objects. * *
* +-------------------+
* | Waiting on Parent | +------------------+
@@ -94,14 +94,14 @@ class ConsumerStates {
/**
* Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to
* do when a transition occurs.
- *
+ *
*/
interface ConsumerState {
/**
* Creates a new task for this state using the passed in consumer to build the task. If there is no task
* required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
* consumer during the execution of this method.
- *
+ *
* @param consumer
* the consumer to use build the task, or execute state.
* @return a valid task for this state or null if there is no task required.
@@ -111,7 +111,7 @@ class ConsumerStates {
/**
* Provides the next state of the consumer upon success of the task return by
* {@link ConsumerState#createTask(ShardConsumer)}.
- *
+ *
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.
*/
@@ -120,7 +120,7 @@ class ConsumerStates {
/**
* Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent
* on the current state, and the shutdown reason.
- *
+ *
* @param shutdownReason
* the reason that a shutdown was requested
* @return the next state that the consumer should transition to, this may be the same object as the current
@@ -131,7 +131,7 @@ class ConsumerStates {
/**
* The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state
* even if createTask would return a null value.
- *
+ *
* @return the type of task that this state represents.
*/
TaskType getTaskType();
@@ -139,7 +139,7 @@ class ConsumerStates {
/**
* An enumeration represent the type of this state. Different consumer states may return the same
* {@link ShardConsumerState}.
- *
+ *
* @return the type of consumer state this represents.
*/
ShardConsumerState getState();
@@ -530,7 +530,8 @@ class ConsumerStates {
consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis(),
- consumer.getGetRecordsCache());
+ consumer.getGetRecordsCache(),
+ consumer.getShardSyncer());
}
@Override
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java
index 448a2953..91fd2350 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java
@@ -22,6 +22,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
+import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,6 +52,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector();
private final ILeaseManager leaseManager;
@@ -61,12 +64,14 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager,
String workerIdentifier,
long leaseDurationMillis,
- long epsilonMillis) {
- super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis);
+ long epsilonMillis,
+ LeaseSelector leaseSelector) {
+ super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis);
this.leaseManager = leaseManager;
}
@@ -75,19 +80,35 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager,
+ String workerIdentifier,
+ long leaseDurationMillis,
+ long epsilonMillis) {
+ this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, DEFAULT_LEASE_SELECTOR);
+ }
+
+ /**
+ * @param leaseManager Lease manager which provides CRUD lease operations.
+ * @param leaseSelector Lease selector which decides which leases to take
+ * @param workerIdentifier Used to identify this worker process
+ * @param leaseDurationMillis Duration of a lease in milliseconds
+ * @param epsilonMillis Delta for timing operations (e.g. checking lease expiry)
+ * @param metricsFactory Metrics factory used to emit metrics
+ */
+ public KinesisClientLibLeaseCoordinator(ILeaseManager leaseManager,
+ LeaseSelector leaseSelector,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis,
IMetricsFactory metricsFactory) {
- super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, metricsFactory);
+ super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis, metricsFactory);
this.leaseManager = leaseManager;
}
/**
* @param leaseManager Lease manager which provides CRUD lease operations.
+ * @param leaseSelector Lease selector which decides which leases to take
* @param workerIdentifier Used to identify this worker process
* @param leaseDurationMillis Duration of a lease in milliseconds
* @param epsilonMillis Delta for timing operations (e.g. checking lease expiry)
@@ -96,6 +117,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager,
+ LeaseSelector leaseSelector,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis,
@@ -103,7 +125,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator getLeaseManager() {
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisLeaseCleanupValidator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisLeaseCleanupValidator.java
new file mode 100644
index 00000000..31d8d998
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisLeaseCleanupValidator.java
@@ -0,0 +1,50 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
+import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Set;
+
+/**
+ * Represents the class that decides if a lease is eligible for cleanup.
+ */
+class KinesisLeaseCleanupValidator implements LeaseCleanupValidator {
+
+ private static final Log LOG = LogFactory.getLog(KinesisLeaseCleanupValidator.class);
+
+ /**
+ * @param lease Candidate shard we are considering for deletion.
+ * @param currentKinesisShardIds
+ * @return true if neither the shard (corresponding to the lease), nor its parents are present in
+ * currentKinesisShardIds
+ * @throws KinesisClientLibIOException Thrown if currentKinesisShardIds contains a parent shard but not the child
+ * shard (we are evaluating for deletion).
+ */
+ @Override
+ public boolean isCandidateForCleanup(KinesisClientLease lease, Set currentKinesisShardIds) throws KinesisClientLibIOException {
+ boolean isCandidateForCleanup = true;
+
+ if (currentKinesisShardIds.contains(lease.getLeaseKey())) {
+ isCandidateForCleanup = false;
+ } else {
+ LOG.info("Found lease for non-existent shard: " + lease.getLeaseKey() + ". Checking its parent shards");
+ Set parentShardIds = lease.getParentShardIds();
+ for (String parentShardId : parentShardIds) {
+
+ // Throw an exception if the parent shard exists (but the child does not).
+ // This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
+ if (currentKinesisShardIds.contains(parentShardId)) {
+ String message =
+ "Parent shard " + parentShardId + " exists but not the child shard "
+ + lease.getLeaseKey();
+ LOG.info(message);
+ throw new KinesisClientLibIOException(message);
+ }
+ }
+ }
+
+ return isCandidateForCleanup;
+ }
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseCleanupValidator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseCleanupValidator.java
new file mode 100644
index 00000000..afb37112
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseCleanupValidator.java
@@ -0,0 +1,21 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
+import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
+
+import java.util.Set;
+
+/**
+ * Represents the class that decides if a lease is eligible for cleanup.
+ */
+public interface LeaseCleanupValidator {
+
+ /**
+ * @param lease Candidate shard we are considering for deletion.
+ * @param currentKinesisShardIds
+ * @return boolean representing if the lease is eligible for cleanup.
+ * @throws KinesisClientLibIOException
+ */
+ boolean isCandidateForCleanup(KinesisClientLease lease, Set currentKinesisShardIds)
+ throws KinesisClientLibIOException;
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
index 4a001b9b..0b0d914d 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
@@ -58,17 +58,20 @@ class ShardConsumer {
private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
+ @Getter
+ private final ShardSyncer shardSyncer;
+
private ITask currentTask;
private long currentTaskSubmitTime;
private Future future;
-
+
@Getter
private final GetRecordsCache getRecordsCache;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
- Optional retryGetRecordsInSeconds,
- Optional maxGetRecordsThreadPool,
- ShardInfo shardInfo) {
+ Optional retryGetRecordsInSeconds,
+ Optional maxGetRecordsThreadPool,
+ ShardInfo shardInfo) {
Optional getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry ->
maxGetRecordsThreadPool.map(max ->
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId())));
@@ -99,20 +102,22 @@ class ShardConsumer {
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
+ * @param shardSyncer shardSyncer instance used to check and create new leases
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShardConsumer(ShardInfo shardInfo,
- StreamConfig streamConfig,
- ICheckpoint checkpoint,
- IRecordProcessor recordProcessor,
- ILeaseManager leaseManager,
- long parentShardPollIntervalMillis,
- boolean cleanupLeasesOfCompletedShards,
- ExecutorService executorService,
- IMetricsFactory metricsFactory,
- long backoffTimeMillis,
- boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
- KinesisClientLibConfiguration config) {
+ StreamConfig streamConfig,
+ ICheckpoint checkpoint,
+ IRecordProcessor recordProcessor,
+ ILeaseManager leaseManager,
+ long parentShardPollIntervalMillis,
+ boolean cleanupLeasesOfCompletedShards,
+ ExecutorService executorService,
+ IMetricsFactory metricsFactory,
+ long backoffTimeMillis,
+ boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
+ KinesisClientLibConfiguration config,
+ ShardSyncer shardSyncer) {
this(shardInfo,
streamConfig,
checkpoint,
@@ -126,7 +131,8 @@ class ShardConsumer {
skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional.empty(),
Optional.empty(),
- config);
+ config,
+ shardSyncer);
}
/**
@@ -142,23 +148,25 @@ class ShardConsumer {
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool.
* @param config Kinesis library configuration
+ * @param shardSyncer shardSyncer instance used to check and create new leases
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShardConsumer(ShardInfo shardInfo,
- StreamConfig streamConfig,
- ICheckpoint checkpoint,
- IRecordProcessor recordProcessor,
- ILeaseManager leaseManager,
- long parentShardPollIntervalMillis,
- boolean cleanupLeasesOfCompletedShards,
- ExecutorService executorService,
- IMetricsFactory metricsFactory,
- long backoffTimeMillis,
- boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
- Optional retryGetRecordsInSeconds,
- Optional maxGetRecordsThreadPool,
- KinesisClientLibConfiguration config) {
-
+ StreamConfig streamConfig,
+ ICheckpoint checkpoint,
+ IRecordProcessor recordProcessor,
+ ILeaseManager leaseManager,
+ long parentShardPollIntervalMillis,
+ boolean cleanupLeasesOfCompletedShards,
+ ExecutorService executorService,
+ IMetricsFactory metricsFactory,
+ long backoffTimeMillis,
+ boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
+ Optional retryGetRecordsInSeconds,
+ Optional maxGetRecordsThreadPool,
+ KinesisClientLibConfiguration config,
+ ShardSyncer shardSyncer) {
+
this(
shardInfo,
streamConfig,
@@ -182,7 +190,8 @@ class ShardConsumer {
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
- config
+ config,
+ shardSyncer
);
}
@@ -203,23 +212,25 @@ class ShardConsumer {
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration
+ * @param shardSyncer shardSyncer instance used to check and create new leases
*/
ShardConsumer(ShardInfo shardInfo,
- StreamConfig streamConfig,
- ICheckpoint checkpoint,
- IRecordProcessor recordProcessor,
- RecordProcessorCheckpointer recordProcessorCheckpointer,
- ILeaseManager leaseManager,
- long parentShardPollIntervalMillis,
- boolean cleanupLeasesOfCompletedShards,
- ExecutorService executorService,
- IMetricsFactory metricsFactory,
- long backoffTimeMillis,
- boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
- KinesisDataFetcher kinesisDataFetcher,
- Optional retryGetRecordsInSeconds,
- Optional maxGetRecordsThreadPool,
- KinesisClientLibConfiguration config) {
+ StreamConfig streamConfig,
+ ICheckpoint checkpoint,
+ IRecordProcessor recordProcessor,
+ RecordProcessorCheckpointer recordProcessorCheckpointer,
+ ILeaseManager leaseManager,
+ long parentShardPollIntervalMillis,
+ boolean cleanupLeasesOfCompletedShards,
+ ExecutorService executorService,
+ IMetricsFactory metricsFactory,
+ long backoffTimeMillis,
+ boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
+ KinesisDataFetcher kinesisDataFetcher,
+ Optional retryGetRecordsInSeconds,
+ Optional maxGetRecordsThreadPool,
+ KinesisClientLibConfiguration config,
+ ShardSyncer shardSyncer) {
this.shardInfo = shardInfo;
this.streamConfig = streamConfig;
this.checkpoint = checkpoint;
@@ -237,12 +248,13 @@ class ShardConsumer {
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
+ this.shardSyncer = shardSyncer;
}
/**
* No-op if current task is pending, otherwise submits next task for this shard.
* This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state.
- *
+ *
* @return true if a new process task was submitted, false otherwise
*/
synchronized boolean consumeShard() {
@@ -343,7 +355,7 @@ class ShardConsumer {
/**
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
* before being shutdown.
- *
+ *
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/
void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
@@ -354,7 +366,7 @@ class ShardConsumer {
/**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
* This is called by Worker when it loses responsibility for a shard.
- *
+ *
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
synchronized boolean beginShutdown() {
@@ -374,7 +386,7 @@ class ShardConsumer {
/**
* Used (by Worker) to check if this ShardConsumer instance has been shutdown
* RecordProcessor shutdown() has been invoked, as appropriate.
- *
+ *
* @return true if shutdown is complete
*/
boolean isShutdown() {
@@ -390,7 +402,7 @@ class ShardConsumer {
/**
* Figure out next task to run based on current state, task, and shutdown context.
- *
+ *
* @return Return next task to run
*/
private ITask getNextTask() {
@@ -406,7 +418,7 @@ class ShardConsumer {
/**
* Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info.
- *
+ *
* @param taskOutcome The outcome of the last task
*/
void updateState(TaskOutcome taskOutcome) {
@@ -438,7 +450,7 @@ class ShardConsumer {
/**
* Private/Internal method - has package level access solely for testing purposes.
- *
+ *
* @return the currentState
*/
ConsumerStates.ShardConsumerState getCurrentState() {
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
index 5a0c3d5a..0bd220e6 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
@@ -38,6 +38,7 @@ class ShardSyncTask implements ITask {
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;
+ private final ShardSyncer shardSyncer;
/**
* @param kinesisProxy Used to fetch information about the stream (e.g. shard list)
@@ -45,19 +46,25 @@ class ShardSyncTask implements ITask {
* @param initialPositionInStream One of LATEST, TRIM_HORIZON or AT_TIMESTAMP. Amazon Kinesis Client Library will
* start processing records from this point in the stream (when an application starts up for the first time)
* except for shards that already have a checkpoint (and their descendant shards).
+ * @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait for expiration
+ * in Kinesis)
+ * @param shardSyncTaskIdleTimeMillis shardSync task idle time in millis
+ * @param shardSyncer shardSyncer instance used to check and create new leases
*/
ShardSyncTask(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards,
- long shardSyncTaskIdleTimeMillis) {
+ long shardSyncTaskIdleTimeMillis,
+ ShardSyncer shardSyncer) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.initialPosition = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
+ this.shardSyncer = shardSyncer;
}
/* (non-Javadoc)
@@ -68,7 +75,7 @@ class ShardSyncTask implements ITask {
Exception exception = null;
try {
- ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
+ shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPosition,
cleanupLeasesUponShardCompletion,
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
index be62c66b..7ef6e601 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
@@ -46,11 +46,12 @@ class ShardSyncTaskManager {
private boolean cleanupLeasesUponShardCompletion;
private boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis;
+ private final ShardSyncer shardSyncer;
/**
* Constructor.
- *
+ *
* @param kinesisProxy Proxy used to fetch streamInfo (shards)
* @param leaseManager Lease manager (used to list and create leases for shards)
* @param initialPositionInStream Initial position in stream
@@ -60,6 +61,7 @@ class ShardSyncTaskManager {
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
+ * @param shardSyncer shardSyncer instance used to check and create new leases
*/
ShardSyncTaskManager(final IKinesisProxy kinesisProxy,
final ILeaseManager leaseManager,
@@ -68,7 +70,8 @@ class ShardSyncTaskManager {
final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory,
- ExecutorService executorService) {
+ ExecutorService executorService,
+ ShardSyncer shardSyncer) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
@@ -77,6 +80,7 @@ class ShardSyncTaskManager {
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream;
+ this.shardSyncer = shardSyncer;
}
synchronized boolean syncShardAndLeaseInfo(Set closedShardIds) {
@@ -104,7 +108,8 @@ class ShardSyncTaskManager {
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
- shardSyncIdleTimeMillis), metricsFactory);
+ shardSyncIdleTimeMillis,
+ shardSyncer), metricsFactory);
future = executorService.submit(currentTask);
submittedNewTask = true;
if (LOG.isDebugEnabled()) {
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
index 3194cd41..e3b448b5 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
@@ -51,26 +51,25 @@ import com.amazonaws.services.kinesis.model.Shard;
class ShardSyncer {
private static final Log LOG = LogFactory.getLog(ShardSyncer.class);
+ private final LeaseCleanupValidator leaseCleanupValidator;
- /**
- * Note constructor is private: We use static synchronized methods - this is a utility class.
- */
- private ShardSyncer() {
+ public ShardSyncer(final LeaseCleanupValidator leaseCleanupValidator) {
+ this.leaseCleanupValidator = leaseCleanupValidator;
}
- static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
+ synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
- throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
- ignoreUnexpectedChildShards);
+ ignoreUnexpectedChildShards);
}
/**
* Check and create leases for any new shards (e.g. following a reshard operation).
- *
+ *
* @param kinesisProxy
* @param leaseManager
* @param initialPositionInStream
@@ -81,26 +80,18 @@ class ShardSyncer {
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
- static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
+ synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
- throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}
- static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
- ILeaseManager leaseManager,
- InitialPositionInStreamExtended initialPositionInStream,
- boolean cleanupLeasesOfCompletedShards)
- throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
- checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
- }
-
/**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
- *
+ *
* @param kinesisProxy
* @param leaseManager
* @param initialPosition
@@ -112,12 +103,12 @@ class ShardSyncer {
* @throws KinesisClientLibIOException
*/
// CHECKSTYLE:OFF CyclomaticComplexity
- private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
+ private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
- throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
List shards = getShardList(kinesisProxy);
LOG.debug("Num shards: " + shards.size());
@@ -131,7 +122,7 @@ class ShardSyncer {
List currentLeases = leaseManager.listLeases();
List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
- inconsistentShardIds);
+ inconsistentShardIds);
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
for (KinesisClientLease lease : newLeasesToCreate) {
long startTimeMillis = System.currentTimeMillis();
@@ -143,10 +134,10 @@ class ShardSyncer {
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
}
}
-
+
List trackedLeases = new ArrayList<>();
if (currentLeases != null) {
- trackedLeases.addAll(currentLeases);
+ trackedLeases.addAll(currentLeases);
}
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
@@ -165,13 +156,13 @@ class ShardSyncer {
* @param inconsistentShardIds
* @throws KinesisClientLibIOException
*/
- private static void assertAllParentShardsAreClosed(Set inconsistentShardIds)
- throws KinesisClientLibIOException {
+ private void assertAllParentShardsAreClosed(Set inconsistentShardIds)
+ throws KinesisClientLibIOException {
if (!inconsistentShardIds.isEmpty()) {
String ids = StringUtils.join(inconsistentShardIds, ' ');
throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. "
- + "This can happen due to a race condition between describeStream and a reshard operation.",
- inconsistentShardIds.size(), ids));
+ + "This can happen due to a race condition between describeStream and a reshard operation.",
+ inconsistentShardIds.size(), ids));
}
}
@@ -182,7 +173,7 @@ class ShardSyncer {
* @param shardIdToShardMap
* @return Set of inconsistent open shard ids for shards having open parents.
*/
- private static Set findInconsistentShardIds(Map> shardIdToChildShardIdsMap,
+ private Set findInconsistentShardIds(Map> shardIdToChildShardIdsMap,
Map shardIdToShardMap) {
Set result = new HashSet();
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
@@ -201,7 +192,7 @@ class ShardSyncer {
* @param trackedLeaseList
* @return
*/
- static Map constructShardIdToKCLLeaseMap(List trackedLeaseList) {
+ Map constructShardIdToKCLLeaseMap(List trackedLeaseList) {
Map trackedLeasesMap = new HashMap<>();
for (KinesisClientLease lease : trackedLeaseList) {
trackedLeasesMap.put(lease.getLeaseKey(), lease);
@@ -210,28 +201,24 @@ class ShardSyncer {
}
/**
- * Note: this has package level access for testing purposes.
+ * Note: this has package level access for testing purposes.
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
* We verify that if the shard is present in the shard list, it is closed and its hash key range
- * is covered by its child shards.
- * @param shards List of all Kinesis shards
- * @param shardIdsOfClosedShards Id of the shard which is expected to be closed
- * @return ShardIds of child shards (children of the expectedClosedShard)
- * @throws KinesisClientLibIOException
+ * is covered by its child shards.
*/
- static synchronized void assertClosedShardsAreCoveredOrAbsent(Map shardIdToShardMap,
+ synchronized void assertClosedShardsAreCoveredOrAbsent(Map shardIdToShardMap,
Map> shardIdToChildShardIdsMap,
- Set shardIdsOfClosedShards) throws KinesisClientLibIOException {
+ Set shardIdsOfClosedShards) throws KinesisClientLibIOException {
String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
- + " while a reshard operation was in progress.";
-
+ + " while a reshard operation was in progress.";
+
for (String shardId : shardIdsOfClosedShards) {
Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) {
LOG.info("Shard " + shardId + " is not present in Kinesis anymore.");
continue;
}
-
+
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
if (endingSequenceNumber == null) {
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards
@@ -248,10 +235,10 @@ class ShardSyncer {
}
}
- private static synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
+ private synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
Map shardIdToShardMap,
Set childShardIds) throws KinesisClientLibIOException {
-
+
BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey());
BigInteger minStartingHashKeyOfChildren = null;
@@ -270,23 +257,23 @@ class ShardSyncer {
maxEndingHashKeyOfChildren = endingHashKey;
}
}
-
+
if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null)
|| (minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0)
|| (maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) {
throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard "
+ closedShard.getShardId() + " is not covered by its child shards.");
}
-
+
}
-
+
/**
* Helper method to construct shardId->setOfChildShardIds map.
* Note: This has package access for testing purposes only.
* @param shardIdToShardMap
* @return
*/
- static Map> constructShardIdToChildShardIdsMap(
+ Map> constructShardIdToChildShardIdsMap(
Map shardIdToShardMap) {
Map> shardIdToChildShardIdsMap = new HashMap<>();
for (Map.Entry entry : shardIdToShardMap.entrySet()) {
@@ -301,7 +288,7 @@ class ShardSyncer {
}
childShardIds.add(shardId);
}
-
+
String adjacentParentShardId = shard.getAdjacentParentShardId();
if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) {
Set childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId);
@@ -315,7 +302,7 @@ class ShardSyncer {
return shardIdToChildShardIdsMap;
}
- private static List getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException {
+ private List getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException {
List shards = kinesisProxy.getShardList();
if (shards == null) {
throw new KinesisClientLibIOException(
@@ -337,13 +324,13 @@ class ShardSyncer {
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
- * leases corresponding to both the parents - the parent shard which is not a descendant will have
+ * leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
- *
+ *
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
- *
+ *
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
@@ -353,7 +340,7 @@ class ShardSyncer {
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
- *
+ *
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
@@ -363,7 +350,7 @@ class ShardSyncer {
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
- *
+ *
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
@@ -371,7 +358,7 @@ class ShardSyncer {
* @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
- static List determineNewLeasesToCreate(List shards,
+ List determineNewLeasesToCreate(List shards,
List currentLeases,
InitialPositionInStreamExtended initialPosition,
Set inconsistentShardIds) {
@@ -452,7 +439,7 @@ class ShardSyncer {
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*/
- static List determineNewLeasesToCreate(List shards,
+ List determineNewLeasesToCreate(List shards,
List currentLeases,
InitialPositionInStreamExtended initialPosition) {
Set inconsistentShardIds = new HashSet();
@@ -464,7 +451,7 @@ class ShardSyncer {
* Check if this shard is a descendant of a shard that is (or will be) processed.
* Create leases for the ancestors of this shard as required.
* See javadoc of determineNewLeasesToCreate() for rules and example.
- *
+ *
* @param shardId The shardId to check.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
@@ -475,13 +462,13 @@ class ShardSyncer {
* @return true if the shard is a descendant of any current shard (lease already exists)
*/
// CHECKSTYLE:OFF CyclomaticComplexity
- static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
+ boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
InitialPositionInStreamExtended initialPosition,
Set shardIdsOfCurrentLeases,
Map shardIdToShardMapOfAllKinesisShards,
Map shardIdToLeaseMapOfNewShards,
Map memoizationContext) {
-
+
Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) {
return previousValue;
@@ -530,7 +517,7 @@ class ShardSyncer {
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
- .equals(InitialPositionInStream.AT_TIMESTAMP)) {
+ .equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.setCheckpoint(convertToCheckpoint(initialPosition));
@@ -544,7 +531,7 @@ class ShardSyncer {
// after the specified initial position timestamp.
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|| initialPosition.getInitialPositionInStream()
- .equals(InitialPositionInStream.AT_TIMESTAMP)) {
+ .equals(InitialPositionInStream.AT_TIMESTAMP)) {
isDescendant = true;
}
}
@@ -561,12 +548,12 @@ class ShardSyncer {
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
* a/ they are not null
* b/ if they exist in the current shard map (i.e. haven't expired)
- *
+ *
* @param shard Will return parents of this shard
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds
*/
- static Set getParentShardIds(Shard shard, Map shardIdToShardMapOfAllKinesisShards) {
+ Set getParentShardIds(Shard shard, Map shardIdToShardMapOfAllKinesisShards) {
Set parentShardIds = new HashSet(2);
String parentShardId = shard.getParentShardId();
if ((parentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(parentShardId)) {
@@ -580,40 +567,40 @@ class ShardSyncer {
}
/**
- * Delete leases corresponding to shards that no longer exist in the stream.
+ * Delete leases corresponding to shards that no longer exist in the stream.
* Current scheme: Delete a lease if:
* * the corresponding shard is not present in the list of Kinesis shards, AND
* * the parentShardIds listed in the lease are also not present in the list of Kinesis shards.
* @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
- * @param trackedLeases List of
+ * @param trackedLeases List of
* @param kinesisProxy Kinesis proxy (used to get shard list)
- * @param leaseManager
+ * @param leaseManager
* @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis.
- * @throws ProvisionedThroughputException
- * @throws InvalidStateException
- * @throws DependencyException
+ * @throws ProvisionedThroughputException
+ * @throws InvalidStateException
+ * @throws DependencyException
*/
- private static void cleanupGarbageLeases(List shards,
+ private void cleanupGarbageLeases(List shards,
List trackedLeases,
IKinesisProxy kinesisProxy,
ILeaseManager leaseManager)
- throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
+ throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
Set kinesisShards = new HashSet<>();
for (Shard shard : shards) {
kinesisShards.add(shard.getShardId());
}
-
+
// Check if there are leases for non-existent shards
List garbageLeases = new ArrayList<>();
for (KinesisClientLease lease : trackedLeases) {
- if (isCandidateForCleanup(lease, kinesisShards)) {
+ if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) {
garbageLeases.add(lease);
}
}
-
+
if (!garbageLeases.isEmpty()) {
LOG.info("Found " + garbageLeases.size()
- + " candidate leases for cleanup. Refreshing list of"
+ + " candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards");
List currentShardList = getShardList(kinesisProxy);
Set currentKinesisShardIds = new HashSet<>();
@@ -622,59 +609,23 @@ class ShardSyncer {
}
for (KinesisClientLease lease : garbageLeases) {
- if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
+ if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) {
LOG.info("Deleting lease for shard " + lease.getLeaseKey()
+ " as it is not present in Kinesis stream.");
leaseManager.deleteLease(lease);
}
}
}
-
+
}
- /**
- * Note: This method has package level access, solely for testing purposes.
- *
- * @param lease Candidate shard we are considering for deletion.
- * @param currentKinesisShardIds
- * @return true if neither the shard (corresponding to the lease), nor its parents are present in
- * currentKinesisShardIds
- * @throws KinesisClientLibIOException Thrown if currentKinesisShardIds contains a parent shard but not the child
- * shard (we are evaluating for deletion).
- */
- static boolean isCandidateForCleanup(KinesisClientLease lease, Set currentKinesisShardIds)
- throws KinesisClientLibIOException {
- boolean isCandidateForCleanup = true;
-
- if (currentKinesisShardIds.contains(lease.getLeaseKey())) {
- isCandidateForCleanup = false;
- } else {
- LOG.info("Found lease for non-existent shard: " + lease.getLeaseKey() + ". Checking its parent shards");
- Set parentShardIds = lease.getParentShardIds();
- for (String parentShardId : parentShardIds) {
-
- // Throw an exception if the parent shard exists (but the child does not).
- // This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
- if (currentKinesisShardIds.contains(parentShardId)) {
- String message =
- "Parent shard " + parentShardId + " exists but not the child shard "
- + lease.getLeaseKey();
- LOG.info(message);
- throw new KinesisClientLibIOException(message);
- }
- }
- }
-
- return isCandidateForCleanup;
- }
-
/**
* Private helper method.
* Clean up leases for shards that meet the following criteria:
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
* TRIM_HORIZON.
- *
+ *
* @param currentLeases List of leases we evaluate for clean up
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
@@ -685,12 +636,12 @@ class ShardSyncer {
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
- private static synchronized void cleanupLeasesOfFinishedShards(Collection currentLeases,
+ private synchronized void cleanupLeasesOfFinishedShards(Collection currentLeases,
Map shardIdToShardMap,
Map> shardIdToChildShardIdsMap,
List trackedLeases,
ILeaseManager leaseManager)
- throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
Set shardIdsOfClosedShards = new HashSet<>();
List leasesOfClosedShards = new ArrayList<>();
for (KinesisClientLease lease : currentLeases) {
@@ -716,38 +667,38 @@ class ShardSyncer {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
}
}
- }
+ }
}
- /**
+ /**
* Delete lease for the closed shard. Rules for deletion are:
* a/ the checkpoint for the closed shard is SHARD_END,
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
* Note: This method has package level access solely for testing purposes.
- *
+ *
* @param closedShardId Identifies the closed shard
* @param childShardIds ShardIds of children of the closed shard
* @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null)
- * @param leaseManager
- * @throws ProvisionedThroughputException
- * @throws InvalidStateException
- * @throws DependencyException
+ * @param leaseManager
+ * @throws ProvisionedThroughputException
+ * @throws InvalidStateException
+ * @throws DependencyException
*/
- static synchronized void cleanupLeaseForClosedShard(String closedShardId,
+ synchronized void cleanupLeaseForClosedShard(String closedShardId,
Set childShardIds,
Map trackedLeases,
ILeaseManager leaseManager)
- throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
List childShardLeases = new ArrayList<>();
-
+
for (String childShardId : childShardIds) {
KinesisClientLease childLease = trackedLeases.get(childShardId);
if (childLease != null) {
childShardLeases.add(childLease);
}
}
-
+
if ((leaseForClosedShard != null)
&& (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END))
&& (childShardLeases.size() == childShardIds.size())) {
@@ -758,7 +709,7 @@ class ShardSyncer {
break;
}
}
-
+
if (okayToDelete) {
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
+ " as it has been completely processed and processing of child shards has begun.");
@@ -770,11 +721,11 @@ class ShardSyncer {
/**
* Helper method to create a new KinesisClientLease POJO for a shard.
* Note: Package level access only for testing purposes
- *
+ *
* @param shard
* @return
*/
- static KinesisClientLease newKCLLease(Shard shard) {
+ KinesisClientLease newKCLLease(Shard shard) {
KinesisClientLease newLease = new KinesisClientLease();
newLease.setLeaseKey(shard.getShardId());
List parentShardIds = new ArrayList(2);
@@ -792,11 +743,11 @@ class ShardSyncer {
/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
- *
+ *
* @param shards List of shards
* @return ShardId->Shard map
*/
- static Map constructShardIdToShardMap(List shards) {
+ Map constructShardIdToShardMap(List shards) {
Map shardIdToShardMap = new HashMap();
for (Shard shard : shards) {
shardIdToShardMap.put(shard.getShardId(), shard);
@@ -807,11 +758,11 @@ class ShardSyncer {
/**
* Helper method to return all the open shards for a stream.
* Note: Package level access only for testing purposes.
- *
+ *
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
- static List getOpenShards(List allShards) {
+ List getOpenShards(List allShards) {
List openShards = new ArrayList();
for (Shard shard : allShards) {
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
@@ -823,9 +774,9 @@ class ShardSyncer {
return openShards;
}
- private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
+ private ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
-
+
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
@@ -833,10 +784,10 @@ class ShardSyncer {
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
}
-
+
return checkpoint;
}
-
+
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
*
*/
@@ -846,7 +797,7 @@ class ShardSyncer {
private static final long serialVersionUID = 1L;
private final Map shardIdToShardMap;
-
+
/**
* @param shardIdToShardMapOfAllKinesisShards
*/
@@ -860,7 +811,7 @@ class ShardSyncer {
* We assume that lease1 and lease2 are:
* a/ not null,
* b/ shards (if found) have non-null starting sequence numbers
- *
+ *
* {@inheritDoc}
*/
@Override
@@ -870,23 +821,23 @@ class ShardSyncer {
String shardId2 = lease2.getLeaseKey();
Shard shard1 = shardIdToShardMap.get(shardId1);
Shard shard2 = shardIdToShardMap.get(shardId2);
-
+
// If we found shards for the two leases, use comparison of the starting sequence numbers
if ((shard1 != null) && (shard2 != null)) {
BigInteger sequenceNumber1 =
new BigInteger(shard1.getSequenceNumberRange().getStartingSequenceNumber());
BigInteger sequenceNumber2 =
new BigInteger(shard2.getSequenceNumberRange().getStartingSequenceNumber());
- result = sequenceNumber1.compareTo(sequenceNumber2);
+ result = sequenceNumber1.compareTo(sequenceNumber2);
}
-
+
if (result == 0) {
result = shardId1.compareTo(shardId2);
}
-
+
return result;
}
-
+
}
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
index a407f009..c6754fa6 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
@@ -48,22 +48,24 @@ class ShutdownTask implements ITask {
private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis;
private final GetRecordsCache getRecordsCache;
+ private final ShardSyncer shardSyncer;
/**
* Constructor.
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShutdownTask(ShardInfo shardInfo,
- IRecordProcessor recordProcessor,
- RecordProcessorCheckpointer recordProcessorCheckpointer,
- ShutdownReason reason,
- IKinesisProxy kinesisProxy,
- InitialPositionInStreamExtended initialPositionInStream,
- boolean cleanupLeasesOfCompletedShards,
- boolean ignoreUnexpectedChildShards,
- ILeaseManager leaseManager,
- long backoffTimeMillis,
- GetRecordsCache getRecordsCache) {
+ IRecordProcessor recordProcessor,
+ RecordProcessorCheckpointer recordProcessorCheckpointer,
+ ShutdownReason reason,
+ IKinesisProxy kinesisProxy,
+ InitialPositionInStreamExtended initialPositionInStream,
+ boolean cleanupLeasesOfCompletedShards,
+ boolean ignoreUnexpectedChildShards,
+ ILeaseManager leaseManager,
+ long backoffTimeMillis,
+ GetRecordsCache getRecordsCache,
+ ShardSyncer shardSyncer) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@@ -75,12 +77,13 @@ class ShutdownTask implements ITask {
this.leaseManager = leaseManager;
this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsCache = getRecordsCache;
+ this.shardSyncer = shardSyncer;
}
/*
* Invokes RecordProcessor shutdown() API.
* (non-Javadoc)
- *
+ *
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/
@Override
@@ -127,7 +130,7 @@ class ShutdownTask implements ITask {
if (reason == ShutdownReason.TERMINATE) {
LOG.debug("Looking for child shards of shard " + shardInfo.getShardId());
// create leases for the child shards
- ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
+ shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPositionInStream,
cleanupLeasesOfCompletedShards,
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index 15076005..4436cbe9 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -33,11 +33,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
+import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
+import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import com.amazonaws.AmazonWebServiceClient;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
@@ -69,7 +70,6 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import lombok.NonNull;
import lombok.Setter;
import lombok.experimental.Accessors;
@@ -84,6 +84,8 @@ public class Worker implements Runnable {
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
+ private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator();
+ private static final LeaseSelector DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector();
private WorkerLog wlog = new WorkerLog();
@@ -114,6 +116,7 @@ public class Worker implements Runnable {
private volatile boolean shutdown;
private volatile long shutdownStartTimeMillis;
private volatile boolean shutdownComplete = false;
+ private final ShardSyncer shardSyncer;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@@ -388,6 +391,7 @@ public class Worker implements Runnable {
config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
+ DEFAULT_LEASE_SELECTOR,
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
@@ -395,8 +399,8 @@ public class Worker implements Runnable {
config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory)
- .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
- .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
+ .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
+ .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService,
metricsFactory,
config.getTaskBackoffTimeMillis(),
@@ -405,7 +409,8 @@ public class Worker implements Runnable {
config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
- DEFAULT_WORKER_STATE_CHANGE_LISTENER);
+ DEFAULT_WORKER_STATE_CHANGE_LISTENER,
+ DEFAULT_LEASE_CLEANUP_VALIDATOR );
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
@@ -457,7 +462,7 @@ public class Worker implements Runnable {
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
- StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
+ StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
@@ -465,7 +470,7 @@ public class Worker implements Runnable {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
- shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER);
+ shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR );
}
/**
@@ -503,16 +508,19 @@ public class Worker implements Runnable {
* Time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool
* Max number of threads in the getRecords thread pool.
+ * @param leaseCleanupValidator
+ * leaseCleanupValidator instance used to validate leases
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,
- InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
- long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
- KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
- IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
- boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
- Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) {
+ InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
+ long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
+ KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
+ IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
+ boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
+ Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
+ LeaseCleanupValidator leaseCleanupValidator) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
@@ -525,9 +533,10 @@ public class Worker implements Runnable {
this.executorService = execService;
this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory;
+ this.shardSyncer = new ShardSyncer(leaseCleanupValidator);
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
- shardSyncIdleTimeMillis, metricsFactory, executorService);
+ shardSyncIdleTimeMillis, metricsFactory, executorService, shardSyncer);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
@@ -629,7 +638,7 @@ public class Worker implements Runnable {
LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
- config.shouldIgnoreUnexpectedChildShards(), 0L);
+ config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
@@ -996,7 +1005,8 @@ public class Worker implements Runnable {
skipShardSyncAtWorkerInitializationIfLeasesExist,
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
- config);
+ config,
+ shardSyncer);
}
@@ -1158,6 +1168,10 @@ public class Worker implements Runnable {
private IKinesisProxy kinesisProxy;
@Setter @Accessors(fluent = true)
private WorkerStateChangeListener workerStateChangeListener;
+ @Setter @Accessors(fluent = true)
+ private LeaseCleanupValidator leaseCleanupValidator;
+ @Setter @Accessors(fluent = true)
+ private LeaseSelector leaseSelector;
@VisibleForTesting
AmazonKinesis getKinesisClient() {
@@ -1272,6 +1286,14 @@ public class Worker implements Runnable {
workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER;
}
+ if(leaseCleanupValidator == null) {
+ leaseCleanupValidator = DEFAULT_LEASE_CLEANUP_VALIDATOR;
+ }
+
+ if(leaseSelector == null) {
+ leaseSelector = DEFAULT_LEASE_SELECTOR;
+ }
+
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
@@ -1287,6 +1309,7 @@ public class Worker implements Runnable {
config.shouldCleanupLeasesUponShardCompletion(),
null,
new KinesisClientLibLeaseCoordinator(leaseManager,
+ leaseSelector,
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
@@ -1294,8 +1317,8 @@ public class Worker implements Runnable {
config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory)
- .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
- .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
+ .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
+ .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService,
metricsFactory,
config.getTaskBackoffTimeMillis(),
@@ -1304,14 +1327,15 @@ public class Worker implements Runnable {
shardPrioritization,
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
- workerStateChangeListener);
+ workerStateChangeListener,
+ leaseCleanupValidator);
}
> R createClient(final T builder,
- final AWSCredentialsProvider credentialsProvider,
- final ClientConfiguration clientConfiguration,
- final String endpointUrl,
- final String region) {
+ final AWSCredentialsProvider credentialsProvider,
+ final ClientConfiguration clientConfiguration,
+ final String endpointUrl,
+ final String region) {
if (credentialsProvider != null) {
builder.withCredentials(credentialsProvider);
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/GenericLeaseSelector.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/GenericLeaseSelector.java
new file mode 100644
index 00000000..9f23991f
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/GenericLeaseSelector.java
@@ -0,0 +1,43 @@
+package com.amazonaws.services.kinesis.leases.impl;
+
+import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * GenericLeaseSelector abstracts away the lease selection logic from the application code that's using leasing.
+ * It owns filtering of the leases to be taken.
+ */
+public class GenericLeaseSelector implements LeaseSelector {
+
+ /**
+ * Provides the list of leases to be taken.
+ * @param expiredLeases list of leases that are currently expired
+ * @param numLeasesToReachTarget the number of leases to be taken
+ * @return
+ */
+ @Override
+ public Set getLeasesToTakeFromExpiredLeases(List expiredLeases, int numLeasesToReachTarget) {
+ Set leasesToTake = new HashSet();
+
+ // If we have expired leases, get up to leases from expiredLeases
+ for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
+ leasesToTake.add(expiredLeases.remove(0));
+ }
+
+ return leasesToTake;
+ }
+
+ /**
+ * Provides the number of leases that should be taken by the worker.
+ * @param allLeases list of all existing leases
+ * @return
+ */
+ @Override
+ public int getLeaseCountThatCanBeTaken(Collection allLeases) {
+ return allLeases.size();
+ }
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java
index f1a87aaf..8ad1c7b8 100644
--- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java
+++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -82,9 +83,14 @@ public class LeaseCoordinator {
private ScheduledExecutorService leaseCoordinatorThreadPool;
private final ExecutorService leaseRenewalThreadpool;
+
private volatile boolean running = false;
private ScheduledFuture> takerFuture;
+ private static LeaseSelector getDefaultLeaseSelector() {
+ return new GenericLeaseSelector<>();
+ }
+
/**
* Constructor.
*
@@ -100,6 +106,23 @@ public class LeaseCoordinator {
this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, new LogMetricsFactory());
}
+ /**
+ * Constructor.
+ *
+ * @param leaseManager LeaseManager instance to use
+ * @param leaseSelector LeaseSelector instance to use
+ * @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership)
+ * @param leaseDurationMillis Duration of a lease
+ * @param epsilonMillis Allow for some variance when calculating lease expirations
+ */
+ public LeaseCoordinator(ILeaseManager leaseManager,
+ LeaseSelector leaseSelector,
+ String workerIdentifier,
+ long leaseDurationMillis,
+ long epsilonMillis) {
+ this(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis, new LogMetricsFactory());
+ }
+
/**
* Constructor.
*
@@ -119,6 +142,27 @@ public class LeaseCoordinator {
KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory);
}
+ /**
+ * Constructor.
+ *
+ * @param leaseManager LeaseManager instance to use
+ * @param leaseSelector LeaseSelector instance to use
+ * @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership)
+ * @param leaseDurationMillis Duration of a lease
+ * @param epsilonMillis Allow for some variance when calculating lease expirations
+ * @param metricsFactory Used to publish metrics about lease operations
+ */
+ public LeaseCoordinator(ILeaseManager leaseManager,
+ LeaseSelector leaseSelector,
+ String workerIdentifier,
+ long leaseDurationMillis,
+ long epsilonMillis,
+ IMetricsFactory metricsFactory) {
+ this(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis,
+ DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
+ KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory);
+ }
+
/**
* Constructor.
*
@@ -138,8 +182,33 @@ public class LeaseCoordinator {
int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) {
+ this(leaseManager, getDefaultLeaseSelector(), workerIdentifier, leaseDurationMillis, epsilonMillis,
+ maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, metricsFactory);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param leaseManager LeaseManager instance to use
+ * @param leaseSelector LeaseSelector instance to use
+ * @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership)
+ * @param leaseDurationMillis Duration of a lease
+ * @param epsilonMillis Allow for some variance when calculating lease expirations
+ * @param maxLeasesForWorker Max leases this Worker can handle at a time
+ * @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing)
+ * @param metricsFactory Used to publish metrics about lease operations
+ */
+ public LeaseCoordinator(ILeaseManager leaseManager,
+ LeaseSelector leaseSelector,
+ String workerIdentifier,
+ long leaseDurationMillis,
+ long epsilonMillis,
+ int maxLeasesForWorker,
+ int maxLeasesToStealAtOneTime,
+ int maxLeaseRenewerThreadCount,
+ IMetricsFactory metricsFactory) {
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
- this.leaseTaker = new LeaseTaker(leaseManager, workerIdentifier, leaseDurationMillis)
+ this.leaseTaker = new LeaseTaker(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis)
.withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
this.leaseRenewer = new LeaseRenewer(
@@ -301,8 +370,8 @@ public class LeaseCoordinator {
} else {
leaseCoordinatorThreadPool.shutdownNow();
LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop",
- leaseTaker.getWorkerIdentifier(),
- STOP_WAIT_TIME_MILLIS));
+ leaseTaker.getWorkerIdentifier(),
+ STOP_WAIT_TIME_MILLIS));
}
} catch (InterruptedException e) {
LOG.debug("Encountered InterruptedException when awaiting threadpool termination");
@@ -328,7 +397,7 @@ public class LeaseCoordinator {
/**
* Requests that renewals for the given lease are stopped.
- *
+ *
* @param lease the lease to stop renewing.
*/
public void dropLease(T lease) {
@@ -359,7 +428,7 @@ public class LeaseCoordinator {
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
public boolean updateLease(T lease, UUID concurrencyToken)
- throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return leaseRenewer.updateLease(lease, concurrencyToken);
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java
index e75fd9c9..80c4ba9a 100644
--- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,6 +60,7 @@ public class LeaseTaker implements ILeaseTaker {
};
private final ILeaseManager leaseManager;
+ private final LeaseSelector leaseSelector;
private final String workerIdentifier;
private final Map allLeases = new HashMap();
private final long leaseDurationNanos;
@@ -67,8 +69,18 @@ public class LeaseTaker implements ILeaseTaker {
private long lastScanTimeNanos = 0L;
+ private static LeaseSelector getDefaultLeaseSelector() {
+ return new GenericLeaseSelector<>();
+ }
+
public LeaseTaker(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis) {
+ this(leaseManager, getDefaultLeaseSelector(), workerIdentifier, leaseDurationMillis);
+ }
+
+ public LeaseTaker(ILeaseManager leaseManager, LeaseSelector leaseSelector,
+ String workerIdentifier, long leaseDurationMillis) {
this.leaseManager = leaseManager;
+ this.leaseSelector = leaseSelector;
this.workerIdentifier = workerIdentifier;
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
}
@@ -122,16 +134,16 @@ public class LeaseTaker implements ILeaseTaker {
* Internal implementation of takeLeases. Takes a callable that can provide the time to enable test cases without
* Thread.sleep. Takes a callable instead of a raw time value because the time needs to be computed as-of
* immediately after the scan.
- *
+ *
* @param timeProvider Callable that will supply the time
- *
+ *
* @return map of lease key to taken lease
- *
+ *
* @throws DependencyException
* @throws InvalidStateException
*/
synchronized Map takeLeases(Callable timeProvider)
- throws DependencyException, InvalidStateException {
+ throws DependencyException, InvalidStateException {
// Key is leaseKey
Map takenLeases = new HashMap();
@@ -159,7 +171,7 @@ public class LeaseTaker implements ILeaseTaker {
if (lastException != null) {
LOG.error("Worker " + workerIdentifier
- + " could not scan leases table, aborting takeLeases. Exception caught by last retry:",
+ + " could not scan leases table, aborting takeLeases. Exception caught by last retry:",
lastException);
return takenLeases;
}
@@ -235,23 +247,23 @@ public class LeaseTaker implements ILeaseTaker {
builder.append(string);
needDelimiter = true;
}
-
+
return builder.toString();
}
/**
* Scan all leases and update lastRenewalTime. Add new leases and delete old leases.
- *
+ *
* @param timeProvider callable that supplies the current time
- *
+ *
* @return list of expired leases, possibly empty, never null.
- *
+ *
* @throws ProvisionedThroughputException if listLeases fails due to lack of provisioned throughput
* @throws InvalidStateException if the lease table does not exist
* @throws DependencyException if listLeases fails in an unexpected way
*/
private void updateAllLeases(Callable timeProvider)
- throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
List freshList = leaseManager.listLeases();
try {
lastScanTimeNanos = timeProvider.call();
@@ -322,7 +334,7 @@ public class LeaseTaker implements ILeaseTaker {
/**
* Compute the number of leases I should try to take based on the state of the system.
- *
+ *
* @param allLeases map of shardId to lease containing all leases
* @param expiredLeases list of leases we determined to be expired
* @return set of leases to take.
@@ -332,7 +344,7 @@ public class LeaseTaker implements ILeaseTaker {
Set leasesToTake = new HashSet();
IMetricsScope metrics = MetricsHelper.getMetricsScope();
- int numLeases = allLeases.size();
+ int numLeases = leaseSelector.getLeaseCountThatCanBeTaken(allLeases.values());
int numWorkers = leaseCounts.size();
if (numLeases == 0) {
@@ -357,8 +369,8 @@ public class LeaseTaker implements ILeaseTaker {
int leaseSpillover = Math.max(0, target - maxLeasesForWorker);
if (target > maxLeasesForWorker) {
LOG.warn(String.format("Worker %s target is %d leases and maxLeasesForWorker is %d."
- + " Resetting target to %d, lease spillover is %d. "
- + " Note that some shards may not be processed if no other workers are able to pick them up.",
+ + " Resetting target to %d, lease spillover is %d. "
+ + " Note that some shards may not be processed if no other workers are able to pick them up.",
workerIdentifier,
target,
maxLeasesForWorker,
@@ -382,10 +394,7 @@ public class LeaseTaker implements ILeaseTaker {
int originalExpiredLeasesSize = expiredLeases.size();
if (expiredLeases.size() > 0) {
- // If we have expired leases, get up to leases from expiredLeases
- for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
- leasesToTake.add(expiredLeases.remove(0));
- }
+ leasesToTake = leaseSelector.getLeasesToTakeFromExpiredLeases(expiredLeases, numLeasesToReachTarget);
} else {
// If there are no expired leases and we need a lease, consider stealing.
List leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
@@ -401,7 +410,7 @@ public class LeaseTaker implements ILeaseTaker {
if (!leasesToTake.isEmpty()) {
LOG.info(String.format("Worker %s saw %d total leases, %d available leases, %d "
- + "workers. Target is %d leases, I have %d leases, I will take %d leases",
+ + "workers. Target is %d leases, I have %d leases, I will take %d leases",
workerIdentifier,
numLeases,
originalExpiredLeasesSize,
@@ -423,11 +432,11 @@ public class LeaseTaker implements ILeaseTaker {
/**
* Choose leases to steal by randomly selecting one or more (up to max) from the most loaded worker.
* Stealing rules:
- *
+ *
* Steal up to maxLeasesToStealAtOneTime leases from the most loaded worker if
* a) he has > target leases and I need >= 1 leases : steal min(leases needed, maxLeasesToStealAtOneTime)
* b) he has == target leases and I need > 1 leases : steal 1
- *
+ *
* @param leaseCounts map of workerIdentifier to lease count
* @param needed # of leases needed to reach the target leases for the worker
* @param target target # of leases per worker
@@ -458,7 +467,7 @@ public class LeaseTaker implements ILeaseTaker {
if (numLeasesToSteal <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Worker %s not stealing from most loaded worker %s. He has %d,"
- + " target is %d, and I need %d",
+ + " target is %d, and I need %d",
workerIdentifier,
mostLoadedWorker.getKey(),
mostLoadedWorker.getValue(),
@@ -469,7 +478,7 @@ public class LeaseTaker implements ILeaseTaker {
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Worker %s will attempt to steal %d leases from most loaded worker %s. "
- + " He has %d leases, target is %d, I need %d, maxLeasesToSteatAtOneTime is %d.",
+ + " He has %d leases, target is %d, I need %d, maxLeasesToSteatAtOneTime is %d.",
workerIdentifier,
numLeasesToSteal,
mostLoadedWorker.getKey(),
@@ -500,7 +509,7 @@ public class LeaseTaker implements ILeaseTaker {
/**
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
* leases.
- *
+ *
* @param expiredLeases list of leases that are currently expired
* @return map of workerIdentifier to lease count
*/
diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/LeaseSelector.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/LeaseSelector.java
new file mode 100644
index 00000000..93f14743
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/LeaseSelector.java
@@ -0,0 +1,30 @@
+package com.amazonaws.services.kinesis.leases.interfaces;
+
+import com.amazonaws.services.kinesis.leases.impl.Lease;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * LeaseSelector abstracts away the lease selection logic from the application code that's using leasing.
+ * It owns filtering of the leases to be taken.
+ */
+public interface LeaseSelector {
+
+ /**
+ * Provides the list of leases to be taken.
+ * @param expiredLeases list of leases that are currently expired
+ * @param numLeasesToReachTarget the number of leases to be taken
+ * @return
+ */
+ Set getLeasesToTakeFromExpiredLeases(List expiredLeases, int numLeasesToReachTarget);
+
+ /**
+ * Provides the number of leases that should be taken by the worker.
+ * @param allLeases list of all existing leases
+ * @return
+ */
+ int getLeaseCountThatCanBeTaken(Collection allLeases);
+}
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java
index 00c1310d..15112b14 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java
@@ -23,7 +23,8 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
-import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
+import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import junit.framework.Assert;
import org.junit.Before;
@@ -56,6 +57,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
@Before
public void setUp() throws ProvisionedThroughputException, DependencyException, InvalidStateException {
final boolean useConsistentReads = true;
+ LeaseSelector leaseSelector = new GenericLeaseSelector<>();
if (leaseManager == null) {
AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain());
leaseManager =
@@ -63,7 +65,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
}
leaseManager.createLeaseTableIfNotExists(10L, 10L);
leaseManager.deleteAll();
- coordinator = new KinesisClientLibLeaseCoordinator(leaseManager, WORKER_ID, 5000L, 50L);
+ coordinator = new KinesisClientLibLeaseCoordinator(leaseManager, WORKER_ID, 5000L, 50L, leaseSelector);
coordinator.start();
}
@@ -210,7 +212,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
}
public void addLeasesToRenew(ILeaseRenewer renewer, String... shardIds)
- throws DependencyException, InvalidStateException {
+ throws DependencyException, InvalidStateException {
List leasesToRenew = new ArrayList();
for (String shardId : shardIds) {
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java
index 11962d8f..35161499 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java
@@ -19,6 +19,9 @@ import static org.mockito.Mockito.doReturn;
import java.util.UUID;
+import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
+import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
+import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import junit.framework.Assert;
import org.junit.Before;
@@ -54,12 +57,13 @@ public class KinesisClientLibLeaseCoordinatorTest {
MockitoAnnotations.initMocks(this);
// Set up lease coordinator
doReturn(true).when(mockLeaseManager).createLeaseTableIfNotExists(anyLong(), anyLong());
- leaseCoordinator = new KinesisClientLibLeaseCoordinator(mockLeaseManager, WORK_ID, TEST_LONG, TEST_LONG);
+ LeaseSelector leaseSelector = new GenericLeaseSelector<>();
+ leaseCoordinator = new KinesisClientLibLeaseCoordinator(mockLeaseManager, WORK_ID, TEST_LONG, TEST_LONG, leaseSelector);
}
@Test(expected = ShutdownException.class)
public void testSetCheckpointWithUnownedShardId()
- throws KinesisClientLibException, DependencyException, InvalidStateException, ProvisionedThroughputException {
+ throws KinesisClientLibException, DependencyException, InvalidStateException, ProvisionedThroughputException {
final boolean succeess = leaseCoordinator.setCheckpoint(SHARD_ID, TEST_CHKPT, TEST_UUID);
Assert.assertFalse("Set Checkpoint should return failure", succeess);
leaseCoordinator.setCheckpoint(SHARD_ID, TEST_CHKPT, TEST_UUID.toString());
@@ -67,7 +71,7 @@ public class KinesisClientLibLeaseCoordinatorTest {
@Test(expected = DependencyException.class)
public void testWaitLeaseTableTimeout()
- throws DependencyException, ProvisionedThroughputException, IllegalStateException {
+ throws DependencyException, ProvisionedThroughputException, IllegalStateException {
// Set mock lease manager to return false in waiting
doReturn(false).when(mockLeaseManager).waitUntilLeaseTableExists(anyLong(), anyLong());
leaseCoordinator.initialize();
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
index 216d59cd..4b48bf4d 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
@@ -97,16 +97,17 @@ public class ShardConsumerTest {
private final boolean skipCheckpointValidationValue = false;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
+ private static final ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
// Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is
// ... a non-final public class, and so can be mocked and spied.
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
private RecordsFetcherFactory recordsFetcherFactory;
-
+
private GetRecordsCache getRecordsCache;
-
+
private KinesisDataFetcher dataFetcher;
-
+
@Mock
private IRecordProcessor processor;
@Mock
@@ -124,12 +125,12 @@ public class ShardConsumerTest {
public void setup() {
getRecordsCache = null;
dataFetcher = null;
-
+
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty());
}
-
+
/**
* Test method to verify consumer stays in INITIALIZING state when InitializationTask fails.
*/
@@ -161,8 +162,9 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
- config);
-
+ config,
+ shardSyncer);
+
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
@@ -209,7 +211,8 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
- config);
+ config,
+ shardSyncer);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
@@ -251,7 +254,8 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
- config);
+ config,
+ shardSyncer);
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
@@ -353,7 +357,7 @@ public class ShardConsumerTest {
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
any(IMetricsFactory.class), anyInt()))
.thenReturn(getRecordsCache);
-
+
ShardConsumer consumer =
new ShardConsumer(shardInfo,
streamConfig,
@@ -370,7 +374,8 @@ public class ShardConsumerTest {
dataFetcher,
Optional.empty(),
Optional.empty(),
- config);
+ config,
+ shardSyncer);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@@ -392,7 +397,7 @@ public class ShardConsumerTest {
}
Thread.sleep(50L);
}
-
+
verify(getRecordsCache, times(5)).getNextResult();
assertThat(processor.getShutdownReason(), nullValue());
@@ -417,7 +422,7 @@ public class ShardConsumerTest {
verify(shutdownNotification, atLeastOnce()).shutdownComplete();
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
-
+
verify(getRecordsCache).shutdown();
executorService.shutdown();
@@ -497,7 +502,7 @@ public class ShardConsumerTest {
),
metricsFactory
);
-
+
ShardConsumer consumer =
new ShardConsumer(shardInfo,
streamConfig,
@@ -514,7 +519,8 @@ public class ShardConsumerTest {
dataFetcher,
Optional.empty(),
Optional.empty(),
- config);
+ config,
+ shardSyncer);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@@ -615,7 +621,7 @@ public class ShardConsumerTest {
atTimestamp);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
-
+
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
@@ -628,7 +634,7 @@ public class ShardConsumerTest {
);
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
-
+
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
@@ -651,7 +657,8 @@ public class ShardConsumerTest {
dataFetcher,
Optional.empty(),
Optional.empty(),
- config);
+ config,
+ shardSyncer);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@@ -660,7 +667,7 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
-
+
verify(getRecordsCache).start();
// We expect to process all records in numRecs calls
@@ -674,7 +681,7 @@ public class ShardConsumerTest {
}
Thread.sleep(50L);
}
-
+
verify(getRecordsCache, times(4)).getNextResult();
assertThat(processor.getShutdownReason(), nullValue());
@@ -692,7 +699,7 @@ public class ShardConsumerTest {
String iterator = fileBasedProxy.getIterator(streamShardId, timestamp);
List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
-
+
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
assertEquals(4, processor.getProcessedRecords().size());
file.delete();
@@ -721,7 +728,8 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
- config);
+ config,
+ shardSyncer);
GetRecordsCache getRecordsCache = spy(consumer.getGetRecordsCache());
@@ -749,7 +757,7 @@ public class ShardConsumerTest {
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
}
-
+
@Test
public void testCreateSynchronousGetRecordsRetrieval() {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
@@ -759,7 +767,7 @@ public class ShardConsumerTest {
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
-
+
ShardConsumer shardConsumer =
new ShardConsumer(shardInfo,
streamConfig,
@@ -774,8 +782,9 @@ public class ShardConsumerTest {
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.empty(),
Optional.empty(),
- config);
-
+ config,
+ shardSyncer);
+
assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
SynchronousGetRecordsRetrievalStrategy.class);
}
@@ -804,24 +813,25 @@ public class ShardConsumerTest {
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.of(1),
Optional.of(2),
- config);
+ config,
+ shardSyncer);
assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
AsynchronousGetRecordsRetrievalStrategy.class);
}
-
+
@SuppressWarnings("unchecked")
@Test
public void testLongRunningTasks() throws InterruptedException {
final long sleepTime = 1000L;
ExecutorService mockExecutorService = mock(ExecutorService.class);
Future mockFuture = mock(Future.class);
-
+
when(mockExecutorService.submit(any(ITask.class))).thenReturn(mockFuture);
when(mockFuture.isDone()).thenReturn(false);
when(mockFuture.isCancelled()).thenReturn(false);
when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.of(sleepTime));
-
+
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.LATEST);
StreamConfig streamConfig = new StreamConfig(
streamProxy,
@@ -830,7 +840,7 @@ public class ShardConsumerTest {
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
INITIAL_POSITION_LATEST);
-
+
ShardConsumer shardConsumer = new ShardConsumer(
shardInfo,
streamConfig,
@@ -843,14 +853,15 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
- config);
-
+ config,
+ shardSyncer);
+
shardConsumer.consumeShard();
Thread.sleep(sleepTime);
-
+
shardConsumer.consumeShard();
-
+
verify(config).getLogWarningForTaskAfterMillis();
verify(mockFuture).isDone();
verify(mockFuture).isCancelled();
@@ -880,7 +891,7 @@ public class ShardConsumerTest {
}
Matcher initializationInputMatcher(final ExtendedSequenceNumber checkpoint,
- final ExtendedSequenceNumber pendingCheckpoint) {
+ final ExtendedSequenceNumber pendingCheckpoint) {
return new TypeSafeMatcher() {
@Override
protected boolean matchesSafely(InitializationInput item) {
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
index 619f3eaf..67cb035a 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
@@ -52,6 +52,7 @@ public class ShardSyncTaskIntegrationTest {
private static AWSCredentialsProvider credentialsProvider;
private IKinesisClientLeaseManager leaseManager;
private IKinesisProxy kinesisProxy;
+ private final ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
/**
* @throws java.lang.Exception
@@ -106,7 +107,7 @@ public class ShardSyncTaskIntegrationTest {
/**
* Test method for call().
- *
+ *
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
@@ -125,7 +126,8 @@ public class ShardSyncTaskIntegrationTest {
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
false,
false,
- 0L);
+ 0L,
+ shardSyncer);
syncTask.call();
List leases = leaseManager.listLeases();
Set leaseKeys = new HashSet();
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java
index 7ff12542..d783ebfc 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java
@@ -70,6 +70,8 @@ public class ShardSyncerTest {
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient);
private static final int EXPONENT = 128;
+ protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator();
+ private static final ShardSyncer shardSyncer = new ShardSyncer(leaseCleanupValidator);
/**
* Old/Obsolete max value of a sequence number (2^128 -1).
*/
@@ -117,7 +119,7 @@ public class ShardSyncerTest {
List shards = new ArrayList();
List leases = new ArrayList();
- Assert.assertTrue(ShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty());
+ Assert.assertTrue(shardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty());
}
/**
@@ -136,7 +138,7 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
List newLeases =
- ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
+ shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(2, newLeases.size());
Set expectedLeaseShardIds = new HashSet();
expectedLeaseShardIds.add(shardId0);
@@ -169,7 +171,7 @@ public class ShardSyncerTest {
inconsistentShardIds.add(shardId2);
List newLeases =
- ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds);
+ shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds);
Assert.assertEquals(2, newLeases.size());
Set expectedLeaseShardIds = new HashSet();
expectedLeaseShardIds.add(shardId0);
@@ -181,7 +183,7 @@ public class ShardSyncerTest {
/**
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
- *
+ *
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
@@ -190,14 +192,14 @@ public class ShardSyncerTest {
*/
@Test
public final void testBootstrapShardLeasesAtTrimHorizon()
- throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
- KinesisClientLibIOException {
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
+ KinesisClientLibIOException {
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_TRIM_HORIZON);
}
/**
* Test bootstrapShardLeases() starting at LATEST (tip of stream)
- *
+ *
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
@@ -206,8 +208,8 @@ public class ShardSyncerTest {
*/
@Test
public final void testBootstrapShardLeasesAtLatest()
- throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
- KinesisClientLibIOException {
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
+ KinesisClientLibIOException {
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST);
}
@@ -220,15 +222,15 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtLatest()
- throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
- IOException {
+ throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
+ IOException {
List shards = constructShardListForGraphA();
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
- ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
- cleanupLeasesOfCompletedShards);
+ shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
+ cleanupLeasesOfCompletedShards, false);
List newLeases = leaseManager.listLeases();
Set expectedLeaseShardIds = new HashSet();
expectedLeaseShardIds.add("shardId-4");
@@ -252,15 +254,15 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizon()
- throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
- IOException {
+ throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
+ IOException {
List shards = constructShardListForGraphA();
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
- ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
- cleanupLeasesOfCompletedShards);
+ shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
+ cleanupLeasesOfCompletedShards, false);
List newLeases = leaseManager.listLeases();
Set expectedLeaseShardIds = new HashSet();
for (int i = 0; i < 11; i++) {
@@ -290,8 +292,8 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
- ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP,
- cleanupLeasesOfCompletedShards);
+ shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP,
+ cleanupLeasesOfCompletedShards, false);
List newLeases = leaseManager.listLeases();
Set