Separated out some lease taker logic as interface implementations to … (#490)

Added interfaces to allow external users to control the lease selection, and cleanup.
This commit is contained in:
achitojha 2019-02-20 11:48:40 -08:00 committed by Justin Pfifer
parent 54e6a48a48
commit fbdd449759
23 changed files with 811 additions and 528 deletions

View file

@ -530,7 +530,8 @@ class ConsumerStates {
consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache());
consumer.getGetRecordsCache(),
consumer.getShardSyncer());
}
@Override

View file

@ -22,6 +22,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -50,6 +52,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
private static final LeaseSelector<KinesisClientLease> DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector<KinesisClientLease>();
private final ILeaseManager<KinesisClientLease> leaseManager;
@ -61,12 +64,14 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @param workerIdentifier Used to identify this worker process
* @param leaseDurationMillis Duration of a lease in milliseconds
* @param epsilonMillis Delta for timing operations (e.g. checking lease expiry)
* @param leaseSelector Lease selector which decides which leases to take
*/
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis) {
super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis);
long epsilonMillis,
LeaseSelector<KinesisClientLease> leaseSelector) {
super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis);
this.leaseManager = leaseManager;
}
@ -75,19 +80,35 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @param workerIdentifier Used to identify this worker process
* @param leaseDurationMillis Duration of a lease in milliseconds
* @param epsilonMillis Delta for timing operations (e.g. checking lease expiry)
* @param metricsFactory Metrics factory used to emit metrics
*/
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis) {
this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, DEFAULT_LEASE_SELECTOR);
}
/**
* @param leaseManager Lease manager which provides CRUD lease operations.
* @param leaseSelector Lease selector which decides which leases to take
* @param workerIdentifier Used to identify this worker process
* @param leaseDurationMillis Duration of a lease in milliseconds
* @param epsilonMillis Delta for timing operations (e.g. checking lease expiry)
* @param metricsFactory Metrics factory used to emit metrics
*/
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
LeaseSelector<KinesisClientLease> leaseSelector,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis,
IMetricsFactory metricsFactory) {
super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, metricsFactory);
super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis, metricsFactory);
this.leaseManager = leaseManager;
}
/**
* @param leaseManager Lease manager which provides CRUD lease operations.
* @param leaseSelector Lease selector which decides which leases to take
* @param workerIdentifier Used to identify this worker process
* @param leaseDurationMillis Duration of a lease in milliseconds
* @param epsilonMillis Delta for timing operations (e.g. checking lease expiry)
@ -96,6 +117,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @param metricsFactory Metrics factory used to emit metrics
*/
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
LeaseSelector<KinesisClientLease> leaseSelector,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis,
@ -103,7 +125,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) {
super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, metricsFactory);
this.leaseManager = leaseManager;
}
@ -148,7 +170,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
boolean setCheckpoint(String shardId, ExtendedSequenceNumber checkpoint, UUID concurrencyToken)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease lease = getCurrentlyHeldLease(shardId);
if (lease == null) {
LOG.info(String.format(
@ -170,7 +192,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
*/
@Override
public void setCheckpoint(String shardId, ExtendedSequenceNumber checkpointValue, String concurrencyToken)
throws KinesisClientLibException {
throws KinesisClientLibException {
try {
boolean wasSuccessful = setCheckpoint(shardId, checkpointValue, UUID.fromString(concurrencyToken));
if (!wasSuccessful) {
@ -235,8 +257,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
*/
@Override
public void prepareCheckpoint(String shardId,
ExtendedSequenceNumber pendingCheckpointValue,
String concurrencyToken) throws KinesisClientLibException {
ExtendedSequenceNumber pendingCheckpointValue,
String concurrencyToken) throws KinesisClientLibException {
try {
boolean wasSuccessful =
prepareCheckpoint(shardId, pendingCheckpointValue, UUID.fromString(concurrencyToken));
@ -307,8 +329,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
leaseManager.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
if (newTableCreated) {
LOG.info(String.format(
"Created new lease table for coordinator with initial read capacity of %d and write capacity of %d.",
initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity));
"Created new lease table for coordinator with initial read capacity of %d and write capacity of %d.",
initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity));
}
// Need to wait for table in active state.
final long secondsBetweenPolls = 10L;

View file

@ -0,0 +1,50 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Set;
/**
* Represents the class that decides if a lease is eligible for cleanup.
*/
class KinesisLeaseCleanupValidator implements LeaseCleanupValidator {
private static final Log LOG = LogFactory.getLog(KinesisLeaseCleanupValidator.class);
/**
* @param lease Candidate shard we are considering for deletion.
* @param currentKinesisShardIds
* @return true if neither the shard (corresponding to the lease), nor its parents are present in
* currentKinesisShardIds
* @throws KinesisClientLibIOException Thrown if currentKinesisShardIds contains a parent shard but not the child
* shard (we are evaluating for deletion).
*/
@Override
public boolean isCandidateForCleanup(KinesisClientLease lease, Set<String> currentKinesisShardIds) throws KinesisClientLibIOException {
boolean isCandidateForCleanup = true;
if (currentKinesisShardIds.contains(lease.getLeaseKey())) {
isCandidateForCleanup = false;
} else {
LOG.info("Found lease for non-existent shard: " + lease.getLeaseKey() + ". Checking its parent shards");
Set<String> parentShardIds = lease.getParentShardIds();
for (String parentShardId : parentShardIds) {
// Throw an exception if the parent shard exists (but the child does not).
// This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
if (currentKinesisShardIds.contains(parentShardId)) {
String message =
"Parent shard " + parentShardId + " exists but not the child shard "
+ lease.getLeaseKey();
LOG.info(message);
throw new KinesisClientLibIOException(message);
}
}
}
return isCandidateForCleanup;
}
}

View file

@ -0,0 +1,21 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import java.util.Set;
/**
* Represents the class that decides if a lease is eligible for cleanup.
*/
public interface LeaseCleanupValidator {
/**
* @param lease Candidate shard we are considering for deletion.
* @param currentKinesisShardIds
* @return boolean representing if the lease is eligible for cleanup.
* @throws KinesisClientLibIOException
*/
boolean isCandidateForCleanup(KinesisClientLease lease, Set<String> currentKinesisShardIds)
throws KinesisClientLibIOException;
}

View file

@ -58,6 +58,9 @@ class ShardConsumer {
private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@Getter
private final ShardSyncer shardSyncer;
private ITask currentTask;
private long currentTaskSubmitTime;
private Future<TaskResult> future;
@ -66,9 +69,9 @@ class ShardConsumer {
private final GetRecordsCache getRecordsCache;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) {
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) {
Optional<GetRecordsRetrievalStrategy> getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry ->
maxGetRecordsThreadPool.map(max ->
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId())));
@ -99,20 +102,22 @@ class ShardConsumer {
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisClientLibConfiguration config) {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisClientLibConfiguration config,
ShardSyncer shardSyncer) {
this(shardInfo,
streamConfig,
checkpoint,
@ -126,7 +131,8 @@ class ShardConsumer {
skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional.empty(),
Optional.empty(),
config);
config,
shardSyncer);
}
/**
@ -142,22 +148,24 @@ class ShardConsumer {
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool.
* @param config Kinesis library configuration
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config) {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config,
ShardSyncer shardSyncer) {
this(
shardInfo,
@ -182,7 +190,8 @@ class ShardConsumer {
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config
config,
shardSyncer
);
}
@ -203,23 +212,25 @@ class ShardConsumer {
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config) {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config,
ShardSyncer shardSyncer) {
this.shardInfo = shardInfo;
this.streamConfig = streamConfig;
this.checkpoint = checkpoint;
@ -237,6 +248,7 @@ class ShardConsumer {
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
this.shardSyncer = shardSyncer;
}
/**

View file

@ -38,6 +38,7 @@ class ShardSyncTask implements ITask {
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;
private final ShardSyncer shardSyncer;
/**
* @param kinesisProxy Used to fetch information about the stream (e.g. shard list)
@ -45,19 +46,25 @@ class ShardSyncTask implements ITask {
* @param initialPositionInStream One of LATEST, TRIM_HORIZON or AT_TIMESTAMP. Amazon Kinesis Client Library will
* start processing records from this point in the stream (when an application starts up for the first time)
* except for shards that already have a checkpoint (and their descendant shards).
* @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param shardSyncTaskIdleTimeMillis shardSync task idle time in millis
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
ShardSyncTask(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards,
long shardSyncTaskIdleTimeMillis) {
long shardSyncTaskIdleTimeMillis,
ShardSyncer shardSyncer) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.initialPosition = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
this.shardSyncer = shardSyncer;
}
/* (non-Javadoc)
@ -68,7 +75,7 @@ class ShardSyncTask implements ITask {
Exception exception = null;
try {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPosition,
cleanupLeasesUponShardCompletion,

View file

@ -46,6 +46,7 @@ class ShardSyncTaskManager {
private boolean cleanupLeasesUponShardCompletion;
private boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis;
private final ShardSyncer shardSyncer;
/**
@ -60,6 +61,7 @@ class ShardSyncTaskManager {
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
ShardSyncTaskManager(final IKinesisProxy kinesisProxy,
final ILeaseManager<KinesisClientLease> leaseManager,
@ -68,7 +70,8 @@ class ShardSyncTaskManager {
final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory,
ExecutorService executorService) {
ExecutorService executorService,
ShardSyncer shardSyncer) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
@ -77,6 +80,7 @@ class ShardSyncTaskManager {
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream;
this.shardSyncer = shardSyncer;
}
synchronized boolean syncShardAndLeaseInfo(Set<String> closedShardIds) {
@ -104,7 +108,8 @@ class ShardSyncTaskManager {
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis), metricsFactory);
shardSyncIdleTimeMillis,
shardSyncer), metricsFactory);
future = executorService.submit(currentTask);
submittedNewTask = true;
if (LOG.isDebugEnabled()) {

View file

@ -51,21 +51,20 @@ import com.amazonaws.services.kinesis.model.Shard;
class ShardSyncer {
private static final Log LOG = LogFactory.getLog(ShardSyncer.class);
private final LeaseCleanupValidator leaseCleanupValidator;
/**
* Note constructor is private: We use static synchronized methods - this is a utility class.
*/
private ShardSyncer() {
public ShardSyncer(final LeaseCleanupValidator leaseCleanupValidator) {
this.leaseCleanupValidator = leaseCleanupValidator;
}
static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards);
ignoreUnexpectedChildShards);
}
/**
@ -81,23 +80,15 @@ class ShardSyncer {
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
}
/**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
*
@ -112,12 +103,12 @@ class ShardSyncer {
* @throws KinesisClientLibIOException
*/
// CHECKSTYLE:OFF CyclomaticComplexity
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
List<Shard> shards = getShardList(kinesisProxy);
LOG.debug("Num shards: " + shards.size());
@ -131,7 +122,7 @@ class ShardSyncer {
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
inconsistentShardIds);
inconsistentShardIds);
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
for (KinesisClientLease lease : newLeasesToCreate) {
long startTimeMillis = System.currentTimeMillis();
@ -165,13 +156,13 @@ class ShardSyncer {
* @param inconsistentShardIds
* @throws KinesisClientLibIOException
*/
private static void assertAllParentShardsAreClosed(Set<String> inconsistentShardIds)
throws KinesisClientLibIOException {
private void assertAllParentShardsAreClosed(Set<String> inconsistentShardIds)
throws KinesisClientLibIOException {
if (!inconsistentShardIds.isEmpty()) {
String ids = StringUtils.join(inconsistentShardIds, ' ');
throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. "
+ "This can happen due to a race condition between describeStream and a reshard operation.",
inconsistentShardIds.size(), ids));
+ "This can happen due to a race condition between describeStream and a reshard operation.",
inconsistentShardIds.size(), ids));
}
}
@ -182,7 +173,7 @@ class ShardSyncer {
* @param shardIdToShardMap
* @return Set of inconsistent open shard ids for shards having open parents.
*/
private static Set<String> findInconsistentShardIds(Map<String, Set<String>> shardIdToChildShardIdsMap,
private Set<String> findInconsistentShardIds(Map<String, Set<String>> shardIdToChildShardIdsMap,
Map<String, Shard> shardIdToShardMap) {
Set<String> result = new HashSet<String>();
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
@ -201,7 +192,7 @@ class ShardSyncer {
* @param trackedLeaseList
* @return
*/
static Map<String, KinesisClientLease> constructShardIdToKCLLeaseMap(List<KinesisClientLease> trackedLeaseList) {
Map<String, KinesisClientLease> constructShardIdToKCLLeaseMap(List<KinesisClientLease> trackedLeaseList) {
Map<String, KinesisClientLease> trackedLeasesMap = new HashMap<>();
for (KinesisClientLease lease : trackedLeaseList) {
trackedLeasesMap.put(lease.getLeaseKey(), lease);
@ -213,17 +204,13 @@ class ShardSyncer {
* Note: this has package level access for testing purposes.
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
* We verify that if the shard is present in the shard list, it is closed and its hash key range
* is covered by its child shards.
* @param shards List of all Kinesis shards
* @param shardIdsOfClosedShards Id of the shard which is expected to be closed
* @return ShardIds of child shards (children of the expectedClosedShard)
* @throws KinesisClientLibIOException
* is covered by its child shards.
*/
static synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> shardIdToShardMap,
synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> shardIdToShardMap,
Map<String, Set<String>> shardIdToChildShardIdsMap,
Set<String> shardIdsOfClosedShards) throws KinesisClientLibIOException {
String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
+ " while a reshard operation was in progress.";
+ " while a reshard operation was in progress.";
for (String shardId : shardIdsOfClosedShards) {
Shard shard = shardIdToShardMap.get(shardId);
@ -248,7 +235,7 @@ class ShardSyncer {
}
}
private static synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
private synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
Map<String, Shard> shardIdToShardMap,
Set<String> childShardIds) throws KinesisClientLibIOException {
@ -286,7 +273,7 @@ class ShardSyncer {
* @param shardIdToShardMap
* @return
*/
static Map<String, Set<String>> constructShardIdToChildShardIdsMap(
Map<String, Set<String>> constructShardIdToChildShardIdsMap(
Map<String, Shard> shardIdToShardMap) {
Map<String, Set<String>> shardIdToChildShardIdsMap = new HashMap<>();
for (Map.Entry<String, Shard> entry : shardIdToShardMap.entrySet()) {
@ -315,7 +302,7 @@ class ShardSyncer {
return shardIdToChildShardIdsMap;
}
private static List<Shard> getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException {
private List<Shard> getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException {
List<Shard> shards = kinesisProxy.getShardList();
if (shards == null) {
throw new KinesisClientLibIOException(
@ -371,7 +358,7 @@ class ShardSyncer {
* @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition,
Set<String> inconsistentShardIds) {
@ -452,7 +439,7 @@ class ShardSyncer {
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*/
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition) {
Set<String> inconsistentShardIds = new HashSet<String>();
@ -475,7 +462,7 @@ class ShardSyncer {
* @return true if the shard is a descendant of any current shard (lease already exists)
*/
// CHECKSTYLE:OFF CyclomaticComplexity
static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
InitialPositionInStreamExtended initialPosition,
Set<String> shardIdsOfCurrentLeases,
Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
@ -530,7 +517,7 @@ class ShardSyncer {
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.setCheckpoint(convertToCheckpoint(initialPosition));
@ -544,7 +531,7 @@ class ShardSyncer {
// after the specified initial position timestamp.
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|| initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
isDescendant = true;
}
}
@ -566,7 +553,7 @@ class ShardSyncer {
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds
*/
static Set<String> getParentShardIds(Shard shard, Map<String, Shard> shardIdToShardMapOfAllKinesisShards) {
Set<String> getParentShardIds(Shard shard, Map<String, Shard> shardIdToShardMapOfAllKinesisShards) {
Set<String> parentShardIds = new HashSet<String>(2);
String parentShardId = shard.getParentShardId();
if ((parentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(parentShardId)) {
@ -593,11 +580,11 @@ class ShardSyncer {
* @throws InvalidStateException
* @throws DependencyException
*/
private static void cleanupGarbageLeases(List<Shard> shards,
private void cleanupGarbageLeases(List<Shard> shards,
List<KinesisClientLease> trackedLeases,
IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
Set<String> kinesisShards = new HashSet<>();
for (Shard shard : shards) {
kinesisShards.add(shard.getShardId());
@ -606,7 +593,7 @@ class ShardSyncer {
// Check if there are leases for non-existent shards
List<KinesisClientLease> garbageLeases = new ArrayList<>();
for (KinesisClientLease lease : trackedLeases) {
if (isCandidateForCleanup(lease, kinesisShards)) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) {
garbageLeases.add(lease);
}
}
@ -622,7 +609,7 @@ class ShardSyncer {
}
for (KinesisClientLease lease : garbageLeases) {
if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) {
LOG.info("Deleting lease for shard " + lease.getLeaseKey()
+ " as it is not present in Kinesis stream.");
leaseManager.deleteLease(lease);
@ -632,42 +619,6 @@ class ShardSyncer {
}
/**
* Note: This method has package level access, solely for testing purposes.
*
* @param lease Candidate shard we are considering for deletion.
* @param currentKinesisShardIds
* @return true if neither the shard (corresponding to the lease), nor its parents are present in
* currentKinesisShardIds
* @throws KinesisClientLibIOException Thrown if currentKinesisShardIds contains a parent shard but not the child
* shard (we are evaluating for deletion).
*/
static boolean isCandidateForCleanup(KinesisClientLease lease, Set<String> currentKinesisShardIds)
throws KinesisClientLibIOException {
boolean isCandidateForCleanup = true;
if (currentKinesisShardIds.contains(lease.getLeaseKey())) {
isCandidateForCleanup = false;
} else {
LOG.info("Found lease for non-existent shard: " + lease.getLeaseKey() + ". Checking its parent shards");
Set<String> parentShardIds = lease.getParentShardIds();
for (String parentShardId : parentShardIds) {
// Throw an exception if the parent shard exists (but the child does not).
// This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
if (currentKinesisShardIds.contains(parentShardId)) {
String message =
"Parent shard " + parentShardId + " exists but not the child shard "
+ lease.getLeaseKey();
LOG.info(message);
throw new KinesisClientLibIOException(message);
}
}
}
return isCandidateForCleanup;
}
/**
* Private helper method.
* Clean up leases for shards that meet the following criteria:
@ -685,12 +636,12 @@ class ShardSyncer {
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
private static synchronized void cleanupLeasesOfFinishedShards(Collection<KinesisClientLease> currentLeases,
private synchronized void cleanupLeasesOfFinishedShards(Collection<KinesisClientLease> currentLeases,
Map<String, Shard> shardIdToShardMap,
Map<String, Set<String>> shardIdToChildShardIdsMap,
List<KinesisClientLease> trackedLeases,
ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
Set<String> shardIdsOfClosedShards = new HashSet<>();
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
for (KinesisClientLease lease : currentLeases) {
@ -733,11 +684,11 @@ class ShardSyncer {
* @throws InvalidStateException
* @throws DependencyException
*/
static synchronized void cleanupLeaseForClosedShard(String closedShardId,
synchronized void cleanupLeaseForClosedShard(String closedShardId,
Set<String> childShardIds,
Map<String, KinesisClientLease> trackedLeases,
ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
List<KinesisClientLease> childShardLeases = new ArrayList<>();
@ -774,7 +725,7 @@ class ShardSyncer {
* @param shard
* @return
*/
static KinesisClientLease newKCLLease(Shard shard) {
KinesisClientLease newKCLLease(Shard shard) {
KinesisClientLease newLease = new KinesisClientLease();
newLease.setLeaseKey(shard.getShardId());
List<String> parentShardIds = new ArrayList<String>(2);
@ -796,7 +747,7 @@ class ShardSyncer {
* @param shards List of shards
* @return ShardId->Shard map
*/
static Map<String, Shard> constructShardIdToShardMap(List<Shard> shards) {
Map<String, Shard> constructShardIdToShardMap(List<Shard> shards) {
Map<String, Shard> shardIdToShardMap = new HashMap<String, Shard>();
for (Shard shard : shards) {
shardIdToShardMap.put(shard.getShardId(), shard);
@ -811,7 +762,7 @@ class ShardSyncer {
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
static List<Shard> getOpenShards(List<Shard> allShards) {
List<Shard> getOpenShards(List<Shard> allShards) {
List<Shard> openShards = new ArrayList<Shard>();
for (Shard shard : allShards) {
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
@ -823,7 +774,7 @@ class ShardSyncer {
return openShards;
}
private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
private ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {

View file

@ -48,22 +48,24 @@ class ShutdownTask implements ITask {
private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis;
private final GetRecordsCache getRecordsCache;
private final ShardSyncer shardSyncer;
/**
* Constructor.
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShutdownTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason,
IKinesisProxy kinesisProxy,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis,
GetRecordsCache getRecordsCache) {
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason,
IKinesisProxy kinesisProxy,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis,
GetRecordsCache getRecordsCache,
ShardSyncer shardSyncer) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -75,6 +77,7 @@ class ShutdownTask implements ITask {
this.leaseManager = leaseManager;
this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsCache = getRecordsCache;
this.shardSyncer = shardSyncer;
}
/*
@ -127,7 +130,7 @@ class ShutdownTask implements ITask {
if (reason == ShutdownReason.TERMINATE) {
LOG.debug("Looking for child shards of shard " + shardInfo.getShardId());
// create leases for the child shards
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPositionInStream,
cleanupLeasesOfCompletedShards,

View file

@ -33,11 +33,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.AmazonWebServiceClient;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
@ -69,7 +70,6 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.NonNull;
import lombok.Setter;
import lombok.experimental.Accessors;
@ -84,6 +84,8 @@ public class Worker implements Runnable {
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator();
private static final LeaseSelector<KinesisClientLease> DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector<KinesisClientLease>();
private WorkerLog wlog = new WorkerLog();
@ -114,6 +116,7 @@ public class Worker implements Runnable {
private volatile boolean shutdown;
private volatile long shutdownStartTimeMillis;
private volatile boolean shutdownComplete = false;
private final ShardSyncer shardSyncer;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@ -388,6 +391,7 @@ public class Worker implements Runnable {
config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
DEFAULT_LEASE_SELECTOR,
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
@ -395,8 +399,8 @@ public class Worker implements Runnable {
config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService,
metricsFactory,
config.getTaskBackoffTimeMillis(),
@ -405,7 +409,8 @@ public class Worker implements Runnable {
config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
DEFAULT_WORKER_STATE_CHANGE_LISTENER);
DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_LEASE_CLEANUP_VALIDATOR );
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
@ -457,7 +462,7 @@ public class Worker implements Runnable {
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
@ -465,7 +470,7 @@ public class Worker implements Runnable {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER);
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR );
}
/**
@ -503,16 +508,19 @@ public class Worker implements Runnable {
* Time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool
* Max number of threads in the getRecords thread pool.
* @param leaseCleanupValidator
* leaseCleanupValidator instance used to validate leases
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) {
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
LeaseCleanupValidator leaseCleanupValidator) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
@ -525,9 +533,10 @@ public class Worker implements Runnable {
this.executorService = execService;
this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory;
this.shardSyncer = new ShardSyncer(leaseCleanupValidator);
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
shardSyncIdleTimeMillis, metricsFactory, executorService);
shardSyncIdleTimeMillis, metricsFactory, executorService, shardSyncer);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -629,7 +638,7 @@ public class Worker implements Runnable {
LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
config.shouldIgnoreUnexpectedChildShards(), 0L);
config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
@ -996,7 +1005,8 @@ public class Worker implements Runnable {
skipShardSyncAtWorkerInitializationIfLeasesExist,
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config);
config,
shardSyncer);
}
@ -1158,6 +1168,10 @@ public class Worker implements Runnable {
private IKinesisProxy kinesisProxy;
@Setter @Accessors(fluent = true)
private WorkerStateChangeListener workerStateChangeListener;
@Setter @Accessors(fluent = true)
private LeaseCleanupValidator leaseCleanupValidator;
@Setter @Accessors(fluent = true)
private LeaseSelector<KinesisClientLease> leaseSelector;
@VisibleForTesting
AmazonKinesis getKinesisClient() {
@ -1272,6 +1286,14 @@ public class Worker implements Runnable {
workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER;
}
if(leaseCleanupValidator == null) {
leaseCleanupValidator = DEFAULT_LEASE_CLEANUP_VALIDATOR;
}
if(leaseSelector == null) {
leaseSelector = DEFAULT_LEASE_SELECTOR;
}
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
@ -1287,6 +1309,7 @@ public class Worker implements Runnable {
config.shouldCleanupLeasesUponShardCompletion(),
null,
new KinesisClientLibLeaseCoordinator(leaseManager,
leaseSelector,
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
@ -1294,8 +1317,8 @@ public class Worker implements Runnable {
config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService,
metricsFactory,
config.getTaskBackoffTimeMillis(),
@ -1304,14 +1327,15 @@ public class Worker implements Runnable {
shardPrioritization,
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
workerStateChangeListener);
workerStateChangeListener,
leaseCleanupValidator);
}
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,
final AWSCredentialsProvider credentialsProvider,
final ClientConfiguration clientConfiguration,
final String endpointUrl,
final String region) {
final AWSCredentialsProvider credentialsProvider,
final ClientConfiguration clientConfiguration,
final String endpointUrl,
final String region) {
if (credentialsProvider != null) {
builder.withCredentials(credentialsProvider);
}

View file

@ -0,0 +1,43 @@
package com.amazonaws.services.kinesis.leases.impl;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* GenericLeaseSelector abstracts away the lease selection logic from the application code that's using leasing.
* It owns filtering of the leases to be taken.
*/
public class GenericLeaseSelector<T extends Lease> implements LeaseSelector<T> {
/**
* Provides the list of leases to be taken.
* @param expiredLeases list of leases that are currently expired
* @param numLeasesToReachTarget the number of leases to be taken
* @return
*/
@Override
public Set<T> getLeasesToTakeFromExpiredLeases(List<T> expiredLeases, int numLeasesToReachTarget) {
Set<T> leasesToTake = new HashSet<T>();
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
}
return leasesToTake;
}
/**
* Provides the number of leases that should be taken by the worker.
* @param allLeases list of all existing leases
* @return
*/
@Override
public int getLeaseCountThatCanBeTaken(Collection<T> allLeases) {
return allLeases.size();
}
}

View file

@ -26,6 +26,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -82,9 +83,14 @@ public class LeaseCoordinator<T extends Lease> {
private ScheduledExecutorService leaseCoordinatorThreadPool;
private final ExecutorService leaseRenewalThreadpool;
private volatile boolean running = false;
private ScheduledFuture<?> takerFuture;
private static <T extends Lease> LeaseSelector<T> getDefaultLeaseSelector() {
return new GenericLeaseSelector<>();
}
/**
* Constructor.
*
@ -100,6 +106,23 @@ public class LeaseCoordinator<T extends Lease> {
this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, new LogMetricsFactory());
}
/**
* Constructor.
*
* @param leaseManager LeaseManager instance to use
* @param leaseSelector LeaseSelector instance to use
* @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis Duration of a lease
* @param epsilonMillis Allow for some variance when calculating lease expirations
*/
public LeaseCoordinator(ILeaseManager<T> leaseManager,
LeaseSelector<T> leaseSelector,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis) {
this(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis, new LogMetricsFactory());
}
/**
* Constructor.
*
@ -119,6 +142,27 @@ public class LeaseCoordinator<T extends Lease> {
KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory);
}
/**
* Constructor.
*
* @param leaseManager LeaseManager instance to use
* @param leaseSelector LeaseSelector instance to use
* @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis Duration of a lease
* @param epsilonMillis Allow for some variance when calculating lease expirations
* @param metricsFactory Used to publish metrics about lease operations
*/
public LeaseCoordinator(ILeaseManager<T> leaseManager,
LeaseSelector<T> leaseSelector,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis,
IMetricsFactory metricsFactory) {
this(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis,
DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory);
}
/**
* Constructor.
*
@ -138,8 +182,33 @@ public class LeaseCoordinator<T extends Lease> {
int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) {
this(leaseManager, getDefaultLeaseSelector(), workerIdentifier, leaseDurationMillis, epsilonMillis,
maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, metricsFactory);
}
/**
* Constructor.
*
* @param leaseManager LeaseManager instance to use
* @param leaseSelector LeaseSelector instance to use
* @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis Duration of a lease
* @param epsilonMillis Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker Max leases this Worker can handle at a time
* @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing)
* @param metricsFactory Used to publish metrics about lease operations
*/
public LeaseCoordinator(ILeaseManager<T> leaseManager,
LeaseSelector<T> leaseSelector,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis,
int maxLeasesForWorker,
int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) {
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis)
this.leaseTaker = new LeaseTaker<T>(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis)
.withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
this.leaseRenewer = new LeaseRenewer<T>(
@ -301,8 +370,8 @@ public class LeaseCoordinator<T extends Lease> {
} else {
leaseCoordinatorThreadPool.shutdownNow();
LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop",
leaseTaker.getWorkerIdentifier(),
STOP_WAIT_TIME_MILLIS));
leaseTaker.getWorkerIdentifier(),
STOP_WAIT_TIME_MILLIS));
}
} catch (InterruptedException e) {
LOG.debug("Encountered InterruptedException when awaiting threadpool termination");
@ -359,7 +428,7 @@ public class LeaseCoordinator<T extends Lease> {
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
public boolean updateLease(T lease, UUID concurrencyToken)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return leaseRenewer.updateLease(lease, concurrencyToken);
}

View file

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -59,6 +60,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
};
private final ILeaseManager<T> leaseManager;
private final LeaseSelector<T> leaseSelector;
private final String workerIdentifier;
private final Map<String, T> allLeases = new HashMap<String, T>();
private final long leaseDurationNanos;
@ -67,8 +69,18 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
private long lastScanTimeNanos = 0L;
private static <T extends Lease> LeaseSelector<T> getDefaultLeaseSelector() {
return new GenericLeaseSelector<>();
}
public LeaseTaker(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
this(leaseManager, getDefaultLeaseSelector(), workerIdentifier, leaseDurationMillis);
}
public LeaseTaker(ILeaseManager<T> leaseManager, LeaseSelector<T> leaseSelector,
String workerIdentifier, long leaseDurationMillis) {
this.leaseManager = leaseManager;
this.leaseSelector = leaseSelector;
this.workerIdentifier = workerIdentifier;
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
}
@ -131,7 +143,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
* @throws InvalidStateException
*/
synchronized Map<String, T> takeLeases(Callable<Long> timeProvider)
throws DependencyException, InvalidStateException {
throws DependencyException, InvalidStateException {
// Key is leaseKey
Map<String, T> takenLeases = new HashMap<String, T>();
@ -159,7 +171,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
if (lastException != null) {
LOG.error("Worker " + workerIdentifier
+ " could not scan leases table, aborting takeLeases. Exception caught by last retry:",
+ " could not scan leases table, aborting takeLeases. Exception caught by last retry:",
lastException);
return takenLeases;
}
@ -251,7 +263,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
* @throws DependencyException if listLeases fails in an unexpected way
*/
private void updateAllLeases(Callable<Long> timeProvider)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
List<T> freshList = leaseManager.listLeases();
try {
lastScanTimeNanos = timeProvider.call();
@ -332,7 +344,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
Set<T> leasesToTake = new HashSet<T>();
IMetricsScope metrics = MetricsHelper.getMetricsScope();
int numLeases = allLeases.size();
int numLeases = leaseSelector.getLeaseCountThatCanBeTaken(allLeases.values());
int numWorkers = leaseCounts.size();
if (numLeases == 0) {
@ -357,8 +369,8 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
int leaseSpillover = Math.max(0, target - maxLeasesForWorker);
if (target > maxLeasesForWorker) {
LOG.warn(String.format("Worker %s target is %d leases and maxLeasesForWorker is %d."
+ " Resetting target to %d, lease spillover is %d. "
+ " Note that some shards may not be processed if no other workers are able to pick them up.",
+ " Resetting target to %d, lease spillover is %d. "
+ " Note that some shards may not be processed if no other workers are able to pick them up.",
workerIdentifier,
target,
maxLeasesForWorker,
@ -382,10 +394,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
int originalExpiredLeasesSize = expiredLeases.size();
if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
}
leasesToTake = leaseSelector.getLeasesToTakeFromExpiredLeases(expiredLeases, numLeasesToReachTarget);
} else {
// If there are no expired leases and we need a lease, consider stealing.
List<T> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
@ -401,7 +410,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
if (!leasesToTake.isEmpty()) {
LOG.info(String.format("Worker %s saw %d total leases, %d available leases, %d "
+ "workers. Target is %d leases, I have %d leases, I will take %d leases",
+ "workers. Target is %d leases, I have %d leases, I will take %d leases",
workerIdentifier,
numLeases,
originalExpiredLeasesSize,
@ -458,7 +467,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
if (numLeasesToSteal <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Worker %s not stealing from most loaded worker %s. He has %d,"
+ " target is %d, and I need %d",
+ " target is %d, and I need %d",
workerIdentifier,
mostLoadedWorker.getKey(),
mostLoadedWorker.getValue(),
@ -469,7 +478,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Worker %s will attempt to steal %d leases from most loaded worker %s. "
+ " He has %d leases, target is %d, I need %d, maxLeasesToSteatAtOneTime is %d.",
+ " He has %d leases, target is %d, I need %d, maxLeasesToSteatAtOneTime is %d.",
workerIdentifier,
numLeasesToSteal,
mostLoadedWorker.getKey(),

View file

@ -0,0 +1,30 @@
package com.amazonaws.services.kinesis.leases.interfaces;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* LeaseSelector abstracts away the lease selection logic from the application code that's using leasing.
* It owns filtering of the leases to be taken.
*/
public interface LeaseSelector<T extends Lease> {
/**
* Provides the list of leases to be taken.
* @param expiredLeases list of leases that are currently expired
* @param numLeasesToReachTarget the number of leases to be taken
* @return
*/
Set<T> getLeasesToTakeFromExpiredLeases(List<T> expiredLeases, int numLeasesToReachTarget);
/**
* Provides the number of leases that should be taken by the worker.
* @param allLeases list of all existing leases
* @return
*/
int getLeaseCountThatCanBeTaken(Collection<T> allLeases);
}

View file

@ -23,7 +23,8 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import junit.framework.Assert;
import org.junit.Before;
@ -56,6 +57,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
@Before
public void setUp() throws ProvisionedThroughputException, DependencyException, InvalidStateException {
final boolean useConsistentReads = true;
LeaseSelector<KinesisClientLease> leaseSelector = new GenericLeaseSelector<>();
if (leaseManager == null) {
AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain());
leaseManager =
@ -63,7 +65,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
}
leaseManager.createLeaseTableIfNotExists(10L, 10L);
leaseManager.deleteAll();
coordinator = new KinesisClientLibLeaseCoordinator(leaseManager, WORKER_ID, 5000L, 50L);
coordinator = new KinesisClientLibLeaseCoordinator(leaseManager, WORKER_ID, 5000L, 50L, leaseSelector);
coordinator.start();
}
@ -210,7 +212,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
}
public void addLeasesToRenew(ILeaseRenewer<KinesisClientLease> renewer, String... shardIds)
throws DependencyException, InvalidStateException {
throws DependencyException, InvalidStateException {
List<KinesisClientLease> leasesToRenew = new ArrayList<KinesisClientLease>();
for (String shardId : shardIds) {

View file

@ -19,6 +19,9 @@ import static org.mockito.Mockito.doReturn;
import java.util.UUID;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import junit.framework.Assert;
import org.junit.Before;
@ -54,12 +57,13 @@ public class KinesisClientLibLeaseCoordinatorTest {
MockitoAnnotations.initMocks(this);
// Set up lease coordinator
doReturn(true).when(mockLeaseManager).createLeaseTableIfNotExists(anyLong(), anyLong());
leaseCoordinator = new KinesisClientLibLeaseCoordinator(mockLeaseManager, WORK_ID, TEST_LONG, TEST_LONG);
LeaseSelector<KinesisClientLease> leaseSelector = new GenericLeaseSelector<>();
leaseCoordinator = new KinesisClientLibLeaseCoordinator(mockLeaseManager, WORK_ID, TEST_LONG, TEST_LONG, leaseSelector);
}
@Test(expected = ShutdownException.class)
public void testSetCheckpointWithUnownedShardId()
throws KinesisClientLibException, DependencyException, InvalidStateException, ProvisionedThroughputException {
throws KinesisClientLibException, DependencyException, InvalidStateException, ProvisionedThroughputException {
final boolean succeess = leaseCoordinator.setCheckpoint(SHARD_ID, TEST_CHKPT, TEST_UUID);
Assert.assertFalse("Set Checkpoint should return failure", succeess);
leaseCoordinator.setCheckpoint(SHARD_ID, TEST_CHKPT, TEST_UUID.toString());
@ -67,7 +71,7 @@ public class KinesisClientLibLeaseCoordinatorTest {
@Test(expected = DependencyException.class)
public void testWaitLeaseTableTimeout()
throws DependencyException, ProvisionedThroughputException, IllegalStateException {
throws DependencyException, ProvisionedThroughputException, IllegalStateException {
// Set mock lease manager to return false in waiting
doReturn(false).when(mockLeaseManager).waitUntilLeaseTableExists(anyLong(), anyLong());
leaseCoordinator.initialize();

View file

@ -97,6 +97,7 @@ public class ShardConsumerTest {
private final boolean skipCheckpointValidationValue = false;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private static final ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
// Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is
// ... a non-final public class, and so can be mocked and spied.
@ -161,7 +162,8 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
config,
shardSyncer);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
@ -209,7 +211,8 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
config,
shardSyncer);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
@ -251,7 +254,8 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
config,
shardSyncer);
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
@ -370,7 +374,8 @@ public class ShardConsumerTest {
dataFetcher,
Optional.empty(),
Optional.empty(),
config);
config,
shardSyncer);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -514,7 +519,8 @@ public class ShardConsumerTest {
dataFetcher,
Optional.empty(),
Optional.empty(),
config);
config,
shardSyncer);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -651,7 +657,8 @@ public class ShardConsumerTest {
dataFetcher,
Optional.empty(),
Optional.empty(),
config);
config,
shardSyncer);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -721,7 +728,8 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
config,
shardSyncer);
GetRecordsCache getRecordsCache = spy(consumer.getGetRecordsCache());
@ -774,7 +782,8 @@ public class ShardConsumerTest {
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.empty(),
Optional.empty(),
config);
config,
shardSyncer);
assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
SynchronousGetRecordsRetrievalStrategy.class);
@ -804,7 +813,8 @@ public class ShardConsumerTest {
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.of(1),
Optional.of(2),
config);
config,
shardSyncer);
assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
AsynchronousGetRecordsRetrievalStrategy.class);
@ -843,7 +853,8 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
config,
shardSyncer);
shardConsumer.consumeShard();
@ -880,7 +891,7 @@ public class ShardConsumerTest {
}
Matcher<InitializationInput> initializationInputMatcher(final ExtendedSequenceNumber checkpoint,
final ExtendedSequenceNumber pendingCheckpoint) {
final ExtendedSequenceNumber pendingCheckpoint) {
return new TypeSafeMatcher<InitializationInput>() {
@Override
protected boolean matchesSafely(InitializationInput item) {

View file

@ -52,6 +52,7 @@ public class ShardSyncTaskIntegrationTest {
private static AWSCredentialsProvider credentialsProvider;
private IKinesisClientLeaseManager leaseManager;
private IKinesisProxy kinesisProxy;
private final ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
/**
* @throws java.lang.Exception
@ -125,7 +126,8 @@ public class ShardSyncTaskIntegrationTest {
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
false,
false,
0L);
0L,
shardSyncer);
syncTask.call();
List<KinesisClientLease> leases = leaseManager.listLeases();
Set<String> leaseKeys = new HashSet<String>();

View file

@ -70,6 +70,8 @@ public class ShardSyncerTest {
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient);
private static final int EXPONENT = 128;
protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator();
private static final ShardSyncer shardSyncer = new ShardSyncer(leaseCleanupValidator);
/**
* Old/Obsolete max value of a sequence number (2^128 -1).
*/
@ -117,7 +119,7 @@ public class ShardSyncerTest {
List<Shard> shards = new ArrayList<Shard>();
List<KinesisClientLease> leases = new ArrayList<KinesisClientLease>();
Assert.assertTrue(ShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty());
Assert.assertTrue(shardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty());
}
/**
@ -136,7 +138,7 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(2, newLeases.size());
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add(shardId0);
@ -169,7 +171,7 @@ public class ShardSyncerTest {
inconsistentShardIds.add(shardId2);
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds);
Assert.assertEquals(2, newLeases.size());
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add(shardId0);
@ -190,8 +192,8 @@ public class ShardSyncerTest {
*/
@Test
public final void testBootstrapShardLeasesAtTrimHorizon()
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_TRIM_HORIZON);
}
@ -206,8 +208,8 @@ public class ShardSyncerTest {
*/
@Test
public final void testBootstrapShardLeasesAtLatest()
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST);
}
@ -220,15 +222,15 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtLatest()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
List<Shard> shards = constructShardListForGraphA();
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards);
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add("shardId-4");
@ -252,15 +254,15 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizon()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
List<Shard> shards = constructShardListForGraphA();
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards);
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, false);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
for (int i = 0; i < 11; i++) {
@ -290,8 +292,8 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP,
cleanupLeasesOfCompletedShards);
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP,
cleanupLeasesOfCompletedShards, false);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
for (int i = 0; i < 11; i++) {
@ -314,8 +316,8 @@ public class ShardSyncerTest {
*/
@Test(expected = KinesisClientLibIOException.class)
public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpen()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
List<Shard> shards = constructShardListForGraphA();
SequenceNumberRange range = shards.get(0).getSequenceNumberRange();
range.setEndingSequenceNumber(null);
@ -324,8 +326,8 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards);
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, false);
dataFile.delete();
}
@ -334,8 +336,8 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
List<Shard> shards = constructShardListForGraphA();
Shard shard = shards.get(5);
Assert.assertEquals("shardId-5", shard.getShardId());
@ -349,8 +351,8 @@ public class ShardSyncerTest {
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true);
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add("shardId-4");
@ -388,8 +390,8 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithDeleteLeaseExceptions()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
// Define the max calling count for lease manager methods.
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 10;
@ -410,8 +412,8 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithListLeasesExceptions()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
// Define the max calling count for lease manager methods.
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 10;
@ -432,8 +434,8 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithCreateLeaseExceptions()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
// Define the max calling count for lease manager methods.
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 5;
@ -452,7 +454,7 @@ public class ShardSyncerTest {
private void retryCheckAndCreateLeaseForNewShards(IKinesisProxy kinesisProxy,
ExceptionThrowingLeaseManagerMethods exceptionMethod,
int exceptionTime, InitialPositionInStreamExtended position)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
if (exceptionMethod != null) {
ExceptionThrowingLeaseManager exceptionThrowingLeaseManager =
new ExceptionThrowingLeaseManager(leaseManager);
@ -461,10 +463,11 @@ public class ShardSyncerTest {
// Only need to try two times.
for (int i = 1; i <= 2; i++) {
try {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
exceptionThrowingLeaseManager,
position,
cleanupLeasesOfCompletedShards);
cleanupLeasesOfCompletedShards,
false);
return;
} catch (LeasingException e) {
LOG.debug("Catch leasing exception", e);
@ -473,10 +476,11 @@ public class ShardSyncerTest {
exceptionThrowingLeaseManager.clearLeaseManagerThrowingExceptionScenario();
}
} else {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
position,
cleanupLeasesOfCompletedShards);
cleanupLeasesOfCompletedShards,
false);
}
}
@ -569,8 +573,8 @@ public class ShardSyncerTest {
ExceptionThrowingLeaseManagerMethods exceptionMethod,
int exceptionTime,
InitialPositionInStreamExtended position)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
ExtendedSequenceNumber extendedSequenceNumber =
new ExtendedSequenceNumber(position.getInitialPositionInStream().toString());
List<Shard> shards = constructShardListForGraphA();
@ -626,10 +630,10 @@ public class ShardSyncerTest {
*/
@Test
public final void testBootstrapShardLeasesCleanupGarbage()
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
String garbageShardId = "shardId-garbage-001";
KinesisClientLease garbageLease = ShardSyncer.newKCLLease(ShardObjectHelper.newShard(garbageShardId,
KinesisClientLease garbageLease = shardSyncer.newKCLLease(ShardObjectHelper.newShard(garbageShardId,
null,
null,
ShardObjectHelper.newSequenceNumberRange("101", null)));
@ -641,8 +645,8 @@ public class ShardSyncerTest {
}
private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
List<Shard> shards = new ArrayList<Shard>();
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
@ -654,8 +658,8 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
false);
shardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
false);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Assert.assertEquals(2, newLeases.size());
Set<String> expectedLeaseShardIds = new HashSet<String>();
@ -690,7 +694,7 @@ public class ShardSyncerTest {
for (InitialPositionInStreamExtended initialPosition : initialPositions) {
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, initialPosition);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, initialPosition);
Assert.assertEquals(2, newLeases.size());
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add(shardId0);
@ -722,7 +726,7 @@ public class ShardSyncerTest {
ShardObjectHelper.newSequenceNumberRange("405", null)));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(1, newLeases.size());
Assert.assertEquals(lastShardId, newLeases.get(0).getLeaseKey());
}
@ -747,7 +751,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -785,7 +789,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-7"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -821,7 +825,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -861,7 +865,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-7"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -890,7 +894,7 @@ public class ShardSyncerTest {
List<Shard> shards = constructShardListForGraphB();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
for (int i = 0; i < 11; i++) {
@ -927,7 +931,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP);
@ -966,7 +970,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-7"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP);
@ -993,7 +997,7 @@ public class ShardSyncerTest {
List<Shard> shards = constructShardListForGraphB();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
for (int i = 0; i < shards.size(); i++) {
@ -1102,7 +1106,7 @@ public class ShardSyncerTest {
@Test
public final void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() {
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST,
Assert.assertFalse(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST,
null,
null,
null,
@ -1117,7 +1121,7 @@ public class ShardSyncerTest {
String shardId = "shardId-trimmed";
Map<String, Shard> kinesisShards = new HashMap<String, Shard>();
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
Assert.assertFalse(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
null,
kinesisShards,
null,
@ -1136,7 +1140,7 @@ public class ShardSyncerTest {
shardIdsOfCurrentLeases.add(shardId);
Map<String, KinesisClientLease> newLeaseMap = new HashMap<String, KinesisClientLease>();
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
Assert.assertTrue(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases,
kinesisShards,
newLeaseMap,
@ -1163,7 +1167,7 @@ public class ShardSyncerTest {
kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null));
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
Assert.assertFalse(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases,
kinesisShards,
newLeaseMap,
@ -1192,7 +1196,7 @@ public class ShardSyncerTest {
kinesisShards.put(shardId, shard);
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
Assert.assertTrue(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases,
kinesisShards,
newLeaseMap,
@ -1209,7 +1213,7 @@ public class ShardSyncerTest {
@Test
public final void testGetParentShardIdsNoParents() {
Shard shard = new Shard();
Assert.assertTrue(ShardSyncer.getParentShardIds(shard, null).isEmpty());
Assert.assertTrue(shardSyncer.getParentShardIds(shard, null).isEmpty());
}
/**
@ -1219,7 +1223,7 @@ public class ShardSyncerTest {
public final void testGetParentShardIdsTrimmedParents() {
Map<String, Shard> shardMap = new HashMap<String, Shard>();
Shard shard = ShardObjectHelper.newShard("shardId-test", "foo", "bar", null);
Assert.assertTrue(ShardSyncer.getParentShardIds(shard, shardMap).isEmpty());
Assert.assertTrue(shardSyncer.getParentShardIds(shard, shardMap).isEmpty());
}
/**
@ -1233,16 +1237,16 @@ public class ShardSyncerTest {
shardMap.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null));
Shard shard = ShardObjectHelper.newShard("shardId-test", parentShardId, null, null);
Set<String> parentShardIds = ShardSyncer.getParentShardIds(shard, shardMap);
Set<String> parentShardIds = shardSyncer.getParentShardIds(shard, shardMap);
Assert.assertEquals(1, parentShardIds.size());
Assert.assertTrue(parentShardIds.contains(parentShardId));
shard.setParentShardId(null);
parentShardIds = ShardSyncer.getParentShardIds(shard, shardMap);
parentShardIds = shardSyncer.getParentShardIds(shard, shardMap);
Assert.assertTrue(parentShardIds.isEmpty());
shard.setAdjacentParentShardId(parentShardId);
parentShardIds = ShardSyncer.getParentShardIds(shard, shardMap);
parentShardIds = shardSyncer.getParentShardIds(shard, shardMap);
Assert.assertEquals(1, parentShardIds.size());
Assert.assertTrue(parentShardIds.contains(parentShardId));
}
@ -1263,16 +1267,16 @@ public class ShardSyncerTest {
Shard shard = ShardObjectHelper.newShard("shardId-test", parentShardId, adjacentParentShardId, null);
shardMap.put(parentShardId, parent);
Set<String> parentShardIds = ShardSyncer.getParentShardIds(shard, shardMap);
Set<String> parentShardIds = shardSyncer.getParentShardIds(shard, shardMap);
Assert.assertEquals(1, parentShardIds.size());
Assert.assertTrue(parentShardIds.contains(parentShardId));
shardMap.remove(parentShardId);
parentShardIds = ShardSyncer.getParentShardIds(shard, shardMap);
parentShardIds = shardSyncer.getParentShardIds(shard, shardMap);
Assert.assertTrue(parentShardIds.isEmpty());
shardMap.put(adjacentParentShardId, adjacentParent);
parentShardIds = ShardSyncer.getParentShardIds(shard, shardMap);
parentShardIds = shardSyncer.getParentShardIds(shard, shardMap);
Assert.assertEquals(1, parentShardIds.size());
Assert.assertTrue(parentShardIds.contains(adjacentParentShardId));
}
@ -1292,7 +1296,7 @@ public class ShardSyncerTest {
Shard shard = ShardObjectHelper.newShard("shardId-test", parentShardId, adjacentParentShardId, null);
Set<String> parentShardIds = ShardSyncer.getParentShardIds(shard, shardMap);
Set<String> parentShardIds = shardSyncer.getParentShardIds(shard, shardMap);
Assert.assertEquals(2, parentShardIds.size());
Assert.assertTrue(parentShardIds.contains(parentShardId));
Assert.assertTrue(parentShardIds.contains(adjacentParentShardId));
@ -1310,7 +1314,7 @@ public class ShardSyncerTest {
shard.setParentShardId(parentShardId);
shard.setAdjacentParentShardId(adjacentParentShardId);
KinesisClientLease lease = ShardSyncer.newKCLLease(shard);
KinesisClientLease lease = shardSyncer.newKCLLease(shard);
Assert.assertEquals(shardId, lease.getLeaseKey());
Assert.assertNull(lease.getCheckpoint());
Set<String> parentIds = lease.getParentShardIds();
@ -1330,7 +1334,7 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard("shardId-0", null, null, null));
shards.add(ShardObjectHelper.newShard("shardId-1", null, null, null));
Map<String, Shard> shardIdToShardMap = ShardSyncer.constructShardIdToShardMap(shards);
Map<String, Shard> shardIdToShardMap = shardSyncer.constructShardIdToShardMap(shards);
Assert.assertEquals(shards.size(), shardIdToShardMap.size());
for (Shard shard : shards) {
Assert.assertSame(shard, shardIdToShardMap.get(shard.getShardId()));
@ -1347,7 +1351,7 @@ public class ShardSyncerTest {
null,
null,
ShardObjectHelper.newSequenceNumberRange("123", "345")));
Assert.assertTrue(ShardSyncer.getOpenShards(shards).isEmpty());
Assert.assertTrue(shardSyncer.getOpenShards(shards).isEmpty());
}
/**
@ -1361,18 +1365,18 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard(shardId, null, null, sequenceNumberRange));
// Verify shard is considered open when it has a null end sequence number
List<Shard> openShards = ShardSyncer.getOpenShards(shards);
List<Shard> openShards = shardSyncer.getOpenShards(shards);
Assert.assertEquals(1, openShards.size());
Assert.assertEquals(shardId, openShards.get(0).getShardId());
// Close shard before testing for max sequence number
sequenceNumberRange.setEndingSequenceNumber("1000");
openShards = ShardSyncer.getOpenShards(shards);
openShards = shardSyncer.getOpenShards(shards);
Assert.assertTrue(openShards.isEmpty());
// Verify shard is considered closed when the end sequence number is set to max allowed sequence number
sequenceNumberRange.setEndingSequenceNumber(MAX_SEQUENCE_NUMBER.toString());
openShards = ShardSyncer.getOpenShards(shards);
openShards = shardSyncer.getOpenShards(shards);
Assert.assertEquals(0, openShards.size());
}
@ -1394,23 +1398,23 @@ public class ShardSyncerTest {
Set<String> currentKinesisShardIds = new HashSet<>();
currentKinesisShardIds.add(shardId);
Assert.assertFalse(ShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds));
Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
currentKinesisShardIds.clear();
Assert.assertTrue(ShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds));
Assert.assertTrue(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
currentKinesisShardIds.add(parentShardId);
// Assert.assertFalse(ShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds));
// Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
currentKinesisShardIds.clear();
Assert.assertTrue(ShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds));
Assert.assertTrue(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
currentKinesisShardIds.add(adjacentParentShardId);
// Assert.assertFalse(ShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds));
// Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
currentKinesisShardIds.add(parentShardId);
// Assert.assertFalse(ShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds));
// Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
currentKinesisShardIds.add(shardId);
Assert.assertFalse(ShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds));
Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
}
/**
@ -1431,7 +1435,7 @@ public class ShardSyncerTest {
Set<String> currentKinesisShardIds = new HashSet<>();
currentKinesisShardIds.add(parentShardId);
Assert.assertFalse(ShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds));
Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
}
/**
@ -1452,7 +1456,7 @@ public class ShardSyncerTest {
Set<String> currentKinesisShardIds = new HashSet<>();
currentKinesisShardIds.add(adjacentParentShardId);
Assert.assertFalse(ShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds));
Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
}
/**
@ -1464,7 +1468,7 @@ public class ShardSyncerTest {
*/
@Test
public final void testCleanupLeaseForClosedShard()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
String closedShardId = "shardId-2";
KinesisClientLease leaseForClosedShard = newLease(closedShardId);
leaseForClosedShard.setCheckpoint(new ExtendedSequenceNumber("1234"));
@ -1482,22 +1486,22 @@ public class ShardSyncerTest {
KinesisClientLease childLease2 = newLease(childShardId2);
childLease2.setParentShardIds(parentShardIds);
childLease2.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
Map<String, KinesisClientLease> trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
Map<String, KinesisClientLease> trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
// empty list of leases
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// closed shard has not been fully processed yet (checkpoint != SHARD_END)
trackedLeases.add(leaseForClosedShard);
trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// closed shard has been fully processed yet (checkpoint == SHARD_END)
leaseForClosedShard.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
leaseManager.updateLease(leaseForClosedShard);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNull(leaseManager.getLease(closedShardId));
// lease for only one child exists
@ -1506,27 +1510,27 @@ public class ShardSyncerTest {
leaseManager.createLeaseIfNotExists(leaseForClosedShard);
leaseManager.createLeaseIfNotExists(childLease1);
trackedLeases.add(childLease1);
trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// leases for both children exists, but they are both at TRIM_HORIZON
leaseManager.createLeaseIfNotExists(childLease2);
trackedLeases.add(childLease2);
trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// leases for both children exists, one is at TRIM_HORIZON
childLease1.setCheckpoint(new ExtendedSequenceNumber("34890"));
leaseManager.updateLease(childLease1);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// leases for both children exists, NONE of them are at TRIM_HORIZON
childLease2.setCheckpoint(new ExtendedSequenceNumber("43789"));
leaseManager.updateLease(childLease2);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNull(leaseManager.getLease(closedShardId));
}
@ -1546,32 +1550,32 @@ public class ShardSyncerTest {
SequenceNumberRange childSequenceNumberRange = ShardObjectHelper.newSequenceNumberRange("206", "300");
Shard child1 =
ShardObjectHelper.newShard("shardId-54879", expectedClosedShardId, null, childSequenceNumberRange);
Map<String, Shard> shardIdToShardMap = ShardSyncer.constructShardIdToShardMap(shards);
Map<String, Shard> shardIdToShardMap = shardSyncer.constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap =
ShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
shardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
Set<String> closedShardIds = new HashSet<>();
closedShardIds.add(expectedClosedShardId);
ShardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
// test for case where shard has been trimmed (absent from list)
ShardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
// Populate shards.
shards.add(closedShard);
shards.add(child1);
shardIdToShardMap.put(expectedClosedShardId, closedShard);
shardIdToShardMap.put(child1.getShardId(), child1);
shardIdToChildShardIdsMap = ShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
shardIdToChildShardIdsMap = shardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
// test degenerate split/merge
child1.setHashKeyRange(hashKeyRange);
ShardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
// test merge
child1.setHashKeyRange(ShardObjectHelper.newHashKeyRange("10", "2985"));
ShardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
child1.setHashKeyRange(ShardObjectHelper.newHashKeyRange("3", "25"));
ShardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
// test split
HashKeyRange childHashKeyRange1 = ShardObjectHelper.newHashKeyRange("10", "15");
@ -1584,8 +1588,8 @@ public class ShardSyncerTest {
childHashKeyRange2);
shards.add(child2);
shardIdToShardMap.put(child2.getShardId(), child2);
shardIdToChildShardIdsMap = ShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
ShardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
shardIdToChildShardIdsMap = shardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
}
/**
@ -1602,12 +1606,12 @@ public class ShardSyncerTest {
Shard openShard =
ShardObjectHelper.newShard(expectedClosedShardId, null, null, sequenceNumberRange, hashKeyRange);
shards.add(openShard);
Map<String, Shard> shardIdToShardMap = ShardSyncer.constructShardIdToShardMap(shards);
Map<String, Shard> shardIdToShardMap = shardSyncer.constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap =
ShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
shardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
Set<String> closedShardIds = new HashSet<>();
closedShardIds.add(expectedClosedShardId);
ShardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
}
/**
@ -1624,12 +1628,12 @@ public class ShardSyncerTest {
Shard closedShard =
ShardObjectHelper.newShard(expectedClosedShardId, null, null, sequenceNumberRange, hashKeyRange);
shards.add(closedShard);
Map<String, Shard> shardIdToShardMap = ShardSyncer.constructShardIdToShardMap(shards);
Map<String, Shard> shardIdToShardMap = shardSyncer.constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap =
ShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
shardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
Set<String> closedShardIds = new HashSet<>();
closedShardIds.add(expectedClosedShardId);
ShardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
}
/**
@ -1661,7 +1665,7 @@ public class ShardSyncerTest {
private void testAssertShardCoveredOrAbsentTestIncompleteSplit(HashKeyRange parentHashKeyRange,
HashKeyRange child1HashKeyRange,
HashKeyRange child2HashKeyRange)
throws KinesisClientLibIOException {
throws KinesisClientLibIOException {
List<Shard> shards = new ArrayList<>();
String expectedClosedShardId = "shardId-34098";
SequenceNumberRange sequenceNumberRange = ShardObjectHelper.newSequenceNumberRange("103", "205");
@ -1683,12 +1687,12 @@ public class ShardSyncerTest {
child2HashKeyRange);
shards.add(child2);
Map<String, Shard> shardIdToShardMap = ShardSyncer.constructShardIdToShardMap(shards);
Map<String, Shard> shardIdToShardMap = shardSyncer.constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap =
ShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
shardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
Set<String> closedShardIds = new HashSet<>();
closedShardIds.add(expectedClosedShardId);
ShardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds);
}
/**

View file

@ -57,6 +57,7 @@ public class ShutdownTaskTest {
defaultParentShardIds,
ExtendedSequenceNumber.LATEST);
IRecordProcessor defaultRecordProcessor = new TestStreamlet();
ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
@Mock
private GetRecordsCache getRecordsCache;
@ -111,7 +112,8 @@ public class ShutdownTaskTest {
ignoreUnexpectedChildShards,
leaseManager,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache);
getRecordsCache,
shardSyncer);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@ -139,7 +141,8 @@ public class ShutdownTaskTest {
ignoreUnexpectedChildShards,
leaseManager,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache);
getRecordsCache,
shardSyncer);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
@ -151,7 +154,7 @@ public class ShutdownTaskTest {
*/
@Test
public final void testGetTaskType() {
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache);
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, shardSyncer);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
}

View file

@ -66,6 +66,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hamcrest.Condition;
@ -111,10 +117,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
@ -159,6 +161,7 @@ public class WorkerTest {
private RecordsFetcherFactory recordsFetcherFactory;
private KinesisClientLibConfiguration config;
private ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
@Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator;
@ -198,36 +201,36 @@ public class WorkerTest {
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
@Override
public com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor createProcessor() {
return new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor() {
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibNonRetryableException e) {
throw new RuntimeException(e);
public com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor createProcessor() {
return new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor() {
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibNonRetryableException e) {
throw new RuntimeException(e);
}
}
}
}
}
@Override
public void processRecords(List<Record> dataRecords, IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibNonRetryableException e) {
throw new RuntimeException(e);
}
}
@Override
public void processRecords(List<Record> dataRecords, IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibNonRetryableException e) {
throw new RuntimeException(e);
}
}
@Override
public void initialize(String shardId) {
@Override
public void initialize(String shardId) {
}
};
}
};
}
};
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
@ -503,7 +506,7 @@ public class WorkerTest {
final int numberOfRecordsPerShard = 10;
List<Shard> shardList = createShardListWithOneSplit();
List<KinesisClientLease> initialLeases = new ArrayList<KinesisClientLease>();
KinesisClientLease lease = ShardSyncer.newKCLLease(shardList.get(0));
KinesisClientLease lease = shardSyncer.newKCLLease(shardList.get(0));
lease.setCheckpoint(new ExtendedSequenceNumber("2"));
initialLeases.add(lease);
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config);
@ -519,7 +522,7 @@ public class WorkerTest {
final int numberOfRecordsPerShard = 10;
List<Shard> shardList = createShardListWithOneSplit();
List<KinesisClientLease> initialLeases = new ArrayList<KinesisClientLease>();
KinesisClientLease lease = ShardSyncer.newKCLLease(shardList.get(0));
KinesisClientLease lease = shardSyncer.newKCLLease(shardList.get(0));
lease.setCheckpoint(new ExtendedSequenceNumber("2"));
initialLeases.add(lease);
boolean callProcessRecordsForEmptyRecordList = true;
@ -611,7 +614,7 @@ public class WorkerTest {
final List<KinesisClientLease> initialLeases = new ArrayList<KinesisClientLease>();
for (Shard shard : shardList) {
KinesisClientLease lease = ShardSyncer.newKCLLease(shard);
KinesisClientLease lease = shardSyncer.newKCLLease(shard);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
initialLeases.add(lease);
}
@ -687,7 +690,7 @@ public class WorkerTest {
final List<KinesisClientLease> initialLeases = new ArrayList<KinesisClientLease>();
for (Shard shard : shardList) {
KinesisClientLease lease = ShardSyncer.newKCLLease(shard);
KinesisClientLease lease = shardSyncer.newKCLLease(shard);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
initialLeases.add(lease);
}
@ -1495,9 +1498,9 @@ public class WorkerTest {
public void testBuilderWithDefaultKinesisProxy() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.build();
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.build();
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisProxy);
}
@ -1508,10 +1511,10 @@ public class WorkerTest {
// Create an instance of KinesisLocalFileProxy for injection and validation
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.kinesisProxy(kinesisProxy)
.build();
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.kinesisProxy(kinesisProxy)
.build();
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
}
@ -2013,7 +2016,7 @@ public class WorkerTest {
Assert.assertEquals(numShards, shardList.size());
List<KinesisClientLease> initialLeases = new ArrayList<KinesisClientLease>();
for (Shard shard : shardList) {
KinesisClientLease lease = ShardSyncer.newKCLLease(shard);
KinesisClientLease lease = shardSyncer.newKCLLease(shard);
lease.setCheckpoint(ExtendedSequenceNumber.AT_TIMESTAMP);
initialLeases.add(lease);
}
@ -2021,11 +2024,11 @@ public class WorkerTest {
}
private void runAndTestWorker(List<Shard> shardList,
int threadPoolSize,
List<KinesisClientLease> initialLeases,
boolean callProcessRecordsForEmptyRecordList,
int numberOfRecordsPerShard,
KinesisClientLibConfiguration clientConfig) throws Exception {
int threadPoolSize,
List<KinesisClientLease> initialLeases,
boolean callProcessRecordsForEmptyRecordList,
int numberOfRecordsPerShard,
KinesisClientLibConfiguration clientConfig) throws Exception {
File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numberOfRecordsPerShard, "unitTestWT001");
IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
@ -2054,15 +2057,15 @@ public class WorkerTest {
}
private WorkerThread runWorker(List<Shard> shardList,
List<KinesisClientLease> initialLeases,
boolean callProcessRecordsForEmptyRecordList,
long failoverTimeMillis,
int numberOfRecordsPerShard,
IKinesisProxy kinesisProxy,
IRecordProcessorFactory recordProcessorFactory,
ExecutorService executorService,
IMetricsFactory metricsFactory,
KinesisClientLibConfiguration clientConfig) throws Exception {
List<KinesisClientLease> initialLeases,
boolean callProcessRecordsForEmptyRecordList,
long failoverTimeMillis,
int numberOfRecordsPerShard,
IKinesisProxy kinesisProxy,
IRecordProcessorFactory recordProcessorFactory,
ExecutorService executorService,
IMetricsFactory metricsFactory,
KinesisClientLibConfiguration clientConfig) throws Exception {
final String stageName = "testStageName";
final int maxRecords = 2;
@ -2077,8 +2080,10 @@ public class WorkerTest {
leaseManager.createLeaseIfNotExists(initialLease);
}
LeaseSelector<KinesisClientLease> leaseSelector = new GenericLeaseSelector<>();
KinesisClientLibLeaseCoordinator leaseCoordinator =
new KinesisClientLibLeaseCoordinator(leaseManager,
leaseSelector,
stageName,
leaseDurationMillis,
epsilonMillis,
@ -2253,7 +2258,7 @@ public class WorkerTest {
}
private Map<String, TestStreamlet>
findShardIdsAndStreamLetsOfShardsWithOnlyOneProcessor(TestStreamletFactory recordProcessorFactory) {
findShardIdsAndStreamLetsOfShardsWithOnlyOneProcessor(TestStreamletFactory recordProcessorFactory) {
Map<String, TestStreamlet> shardIdsAndStreamLetsOfShardsWithOnlyOneProcessor =
new HashMap<String, TestStreamlet>();
Set<String> seenShardIds = new HashSet<String>();

View file

@ -28,6 +28,7 @@ import java.util.Map;
import javax.swing.*;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -46,8 +47,8 @@ public class LeaseCoordinatorExerciser {
private static final Log LOG = LogFactory.getLog(LeaseCoordinatorExerciser.class);
public static void main(String[] args)
throws InterruptedException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
throws InterruptedException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
int numCoordinators = 9;
int numLeases = 73;
@ -69,12 +70,14 @@ public class LeaseCoordinatorExerciser {
}
CWMetricsFactory metricsFactory = new CWMetricsFactory(creds, "testNamespace", 30 * 1000, 1000);
LeaseSelector<KinesisClientLease> leaseSelector = new GenericLeaseSelector<KinesisClientLease>();
final List<LeaseCoordinator<KinesisClientLease>> coordinators =
new ArrayList<LeaseCoordinator<KinesisClientLease>>();
for (int i = 0; i < numCoordinators; i++) {
String workerIdentifier = "worker-" + Integer.toString(i);
LeaseCoordinator<KinesisClientLease> coord = new LeaseCoordinator<KinesisClientLease>(leaseManager,
leaseSelector,
workerIdentifier,
leaseDurationMillis,
epsilonMillis,

View file

@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.leases.impl;
import java.util.Map;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -26,10 +27,11 @@ public class LeaseTakerIntegrationTest extends LeaseIntegrationTest {
private static final long LEASE_DURATION_MILLIS = 1000L;
private LeaseTaker<KinesisClientLease> taker;
private static final LeaseSelector<KinesisClientLease> leaseSelector = new GenericLeaseSelector<>();
@Before
public void setUp() {
taker = new LeaseTaker<KinesisClientLease>(leaseManager, "foo", LEASE_DURATION_MILLIS);
taker = new LeaseTaker<KinesisClientLease>(leaseManager, leaseSelector,"foo", LEASE_DURATION_MILLIS);
}
@Test