[Issue 210] - Allow unexpected child shards to be ignored (#240)
Allow unexpected child shards to be ignored now instead of always throwing an assertion if a child shard has an open parent, consider worker configuration before doing so. if configured to ignore such shards, do not create leases for them during shard sync. this is intended to mitigate failing worker init when processing dynamodb streams with many thousands of shards (which can happen for tables with thousands of partitions). this new behavior can be enabled by adding the following to a configuration/properties file: ``` ignoreUnexpectedChildShards = true ```
This commit is contained in:
parent
85d6c059c2
commit
9074864027
12 changed files with 212 additions and 38 deletions
|
|
@ -527,6 +527,7 @@ class ConsumerStates {
|
||||||
consumer.getStreamConfig().getStreamProxy(),
|
consumer.getStreamConfig().getStreamProxy(),
|
||||||
consumer.getStreamConfig().getInitialPositionInStream(),
|
consumer.getStreamConfig().getInitialPositionInStream(),
|
||||||
consumer.isCleanupLeasesOfCompletedShards(),
|
consumer.isCleanupLeasesOfCompletedShards(),
|
||||||
|
consumer.isIgnoreUnexpectedChildShards(),
|
||||||
consumer.getLeaseManager(),
|
consumer.getLeaseManager(),
|
||||||
consumer.getTaskBackoffTimeMillis(),
|
consumer.getTaskBackoffTimeMillis(),
|
||||||
consumer.getGetRecordsCache());
|
consumer.getGetRecordsCache());
|
||||||
|
|
|
||||||
|
|
@ -200,6 +200,7 @@ public class KinesisClientLibConfiguration {
|
||||||
private boolean callProcessRecordsEvenForEmptyRecordList;
|
private boolean callProcessRecordsEvenForEmptyRecordList;
|
||||||
private long parentShardPollIntervalMillis;
|
private long parentShardPollIntervalMillis;
|
||||||
private boolean cleanupLeasesUponShardCompletion;
|
private boolean cleanupLeasesUponShardCompletion;
|
||||||
|
private boolean ignoreUnexpectedChildShards;
|
||||||
private ClientConfiguration kinesisClientConfig;
|
private ClientConfiguration kinesisClientConfig;
|
||||||
private ClientConfiguration dynamoDBClientConfig;
|
private ClientConfiguration dynamoDBClientConfig;
|
||||||
private ClientConfiguration cloudWatchClientConfig;
|
private ClientConfiguration cloudWatchClientConfig;
|
||||||
|
|
@ -802,6 +803,13 @@ public class KinesisClientLibConfiguration {
|
||||||
return cleanupLeasesUponShardCompletion;
|
return cleanupLeasesUponShardCompletion;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if we should ignore child shards which have open parents
|
||||||
|
*/
|
||||||
|
public boolean shouldIgnoreUnexpectedChildShards() {
|
||||||
|
return ignoreUnexpectedChildShards;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before
|
* @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before
|
||||||
* checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)}
|
* checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)}
|
||||||
|
|
@ -1022,6 +1030,16 @@ public class KinesisClientLibConfiguration {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param ignoreUnexpectedChildShards Ignore child shards with open parents.
|
||||||
|
* @return KinesisClientLibConfiguration
|
||||||
|
*/
|
||||||
|
public KinesisClientLibConfiguration withIgnoreUnexpectedChildShards(
|
||||||
|
boolean ignoreUnexpectedChildShards) {
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client
|
* @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client
|
||||||
* @return KinesisClientLibConfiguration
|
* @return KinesisClientLibConfiguration
|
||||||
|
|
|
||||||
|
|
@ -484,6 +484,10 @@ class ShardConsumer {
|
||||||
return cleanupLeasesOfCompletedShards;
|
return cleanupLeasesOfCompletedShards;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isIgnoreUnexpectedChildShards() {
|
||||||
|
return config.shouldIgnoreUnexpectedChildShards();
|
||||||
|
}
|
||||||
|
|
||||||
long getTaskBackoffTimeMillis() {
|
long getTaskBackoffTimeMillis() {
|
||||||
return taskBackoffTimeMillis;
|
return taskBackoffTimeMillis;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ class ShardSyncTask implements ITask {
|
||||||
private final ILeaseManager<KinesisClientLease> leaseManager;
|
private final ILeaseManager<KinesisClientLease> leaseManager;
|
||||||
private InitialPositionInStreamExtended initialPosition;
|
private InitialPositionInStreamExtended initialPosition;
|
||||||
private final boolean cleanupLeasesUponShardCompletion;
|
private final boolean cleanupLeasesUponShardCompletion;
|
||||||
|
private final boolean ignoreUnexpectedChildShards;
|
||||||
private final long shardSyncTaskIdleTimeMillis;
|
private final long shardSyncTaskIdleTimeMillis;
|
||||||
private final TaskType taskType = TaskType.SHARDSYNC;
|
private final TaskType taskType = TaskType.SHARDSYNC;
|
||||||
|
|
||||||
|
|
@ -49,11 +50,13 @@ class ShardSyncTask implements ITask {
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
InitialPositionInStreamExtended initialPositionInStream,
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
boolean cleanupLeasesUponShardCompletion,
|
boolean cleanupLeasesUponShardCompletion,
|
||||||
|
boolean ignoreUnexpectedChildShards,
|
||||||
long shardSyncTaskIdleTimeMillis) {
|
long shardSyncTaskIdleTimeMillis) {
|
||||||
this.kinesisProxy = kinesisProxy;
|
this.kinesisProxy = kinesisProxy;
|
||||||
this.leaseManager = leaseManager;
|
this.leaseManager = leaseManager;
|
||||||
this.initialPosition = initialPositionInStream;
|
this.initialPosition = initialPositionInStream;
|
||||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
|
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,7 +71,8 @@ class ShardSyncTask implements ITask {
|
||||||
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
||||||
leaseManager,
|
leaseManager,
|
||||||
initialPosition,
|
initialPosition,
|
||||||
cleanupLeasesUponShardCompletion);
|
cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards);
|
||||||
if (shardSyncTaskIdleTimeMillis > 0) {
|
if (shardSyncTaskIdleTimeMillis > 0) {
|
||||||
Thread.sleep(shardSyncTaskIdleTimeMillis);
|
Thread.sleep(shardSyncTaskIdleTimeMillis);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ class ShardSyncTaskManager {
|
||||||
private final ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||||
private boolean cleanupLeasesUponShardCompletion;
|
private boolean cleanupLeasesUponShardCompletion;
|
||||||
|
private boolean ignoreUnexpectedChildShards;
|
||||||
private final long shardSyncIdleTimeMillis;
|
private final long shardSyncIdleTimeMillis;
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -55,6 +56,7 @@ class ShardSyncTaskManager {
|
||||||
* @param initialPositionInStream Initial position in stream
|
* @param initialPositionInStream Initial position in stream
|
||||||
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
|
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
|
||||||
* until they expire)
|
* until they expire)
|
||||||
|
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
|
||||||
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
|
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
|
||||||
* @param metricsFactory Metrics factory
|
* @param metricsFactory Metrics factory
|
||||||
* @param executorService ExecutorService to execute the shard sync tasks
|
* @param executorService ExecutorService to execute the shard sync tasks
|
||||||
|
|
@ -63,6 +65,7 @@ class ShardSyncTaskManager {
|
||||||
final ILeaseManager<KinesisClientLease> leaseManager,
|
final ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
final InitialPositionInStreamExtended initialPositionInStream,
|
final InitialPositionInStreamExtended initialPositionInStream,
|
||||||
final boolean cleanupLeasesUponShardCompletion,
|
final boolean cleanupLeasesUponShardCompletion,
|
||||||
|
final boolean ignoreUnexpectedChildShards,
|
||||||
final long shardSyncIdleTimeMillis,
|
final long shardSyncIdleTimeMillis,
|
||||||
final IMetricsFactory metricsFactory,
|
final IMetricsFactory metricsFactory,
|
||||||
ExecutorService executorService) {
|
ExecutorService executorService) {
|
||||||
|
|
@ -70,6 +73,7 @@ class ShardSyncTaskManager {
|
||||||
this.leaseManager = leaseManager;
|
this.leaseManager = leaseManager;
|
||||||
this.metricsFactory = metricsFactory;
|
this.metricsFactory = metricsFactory;
|
||||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
|
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
|
||||||
this.executorService = executorService;
|
this.executorService = executorService;
|
||||||
this.initialPositionInStream = initialPositionInStream;
|
this.initialPositionInStream = initialPositionInStream;
|
||||||
|
|
@ -99,6 +103,7 @@ class ShardSyncTaskManager {
|
||||||
leaseManager,
|
leaseManager,
|
||||||
initialPositionInStream,
|
initialPositionInStream,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
shardSyncIdleTimeMillis), metricsFactory);
|
shardSyncIdleTimeMillis), metricsFactory);
|
||||||
future = executorService.submit(currentTask);
|
future = executorService.submit(currentTask);
|
||||||
submittedNewTask = true;
|
submittedNewTask = true;
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||||
|
|
@ -60,9 +61,11 @@ class ShardSyncer {
|
||||||
static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
|
static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
InitialPositionInStreamExtended initialPositionInStream,
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
boolean cleanupLeasesOfCompletedShards)
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
|
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -71,21 +74,28 @@ class ShardSyncer {
|
||||||
* @param kinesisProxy
|
* @param kinesisProxy
|
||||||
* @param leaseManager
|
* @param leaseManager
|
||||||
* @param initialPositionInStream
|
* @param initialPositionInStream
|
||||||
* @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
|
* @param cleanupLeasesOfCompletedShards
|
||||||
* shows this shard to be closed (e.g. parent shard must be closed after a reshard operation).
|
* @param ignoreUnexpectedChildShards
|
||||||
* If it is open, we assume this is an race condition around a reshard event and throw
|
|
||||||
* a KinesisClientLibIOException so client can backoff and retry later.
|
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
* @throws InvalidStateException
|
* @throws InvalidStateException
|
||||||
* @throws ProvisionedThroughputException
|
* @throws ProvisionedThroughputException
|
||||||
* @throws KinesisClientLibIOException
|
* @throws KinesisClientLibIOException
|
||||||
*/
|
*/
|
||||||
|
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
|
||||||
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards)
|
||||||
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
|
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
|
||||||
|
}
|
||||||
|
|
||||||
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
|
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
InitialPositionInStreamExtended initialPositionInStream,
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
boolean cleanupLeasesOfCompletedShards)
|
boolean cleanupLeasesOfCompletedShards)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
|
checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -93,11 +103,9 @@ class ShardSyncer {
|
||||||
*
|
*
|
||||||
* @param kinesisProxy
|
* @param kinesisProxy
|
||||||
* @param leaseManager
|
* @param leaseManager
|
||||||
* @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
|
* @param initialPosition
|
||||||
* does not show this shard to be open (e.g. parent shard must be closed after a reshard operation).
|
* @param cleanupLeasesOfCompletedShards
|
||||||
* If it is still open, we assume this is a race condition around a reshard event and
|
* @param ignoreUnexpectedChildShards
|
||||||
* throw a KinesisClientLibIOException so client can backoff and retry later. If the shard doesn't exist in
|
|
||||||
* Kinesis at all, we assume this is an old/expired shard and continue with the sync operation.
|
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
* @throws InvalidStateException
|
* @throws InvalidStateException
|
||||||
* @throws ProvisionedThroughputException
|
* @throws ProvisionedThroughputException
|
||||||
|
|
@ -107,18 +115,23 @@ class ShardSyncer {
|
||||||
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
|
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
InitialPositionInStreamExtended initialPosition,
|
InitialPositionInStreamExtended initialPosition,
|
||||||
boolean cleanupLeasesOfCompletedShards)
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
List<Shard> shards = getShardList(kinesisProxy);
|
List<Shard> shards = getShardList(kinesisProxy);
|
||||||
LOG.debug("Num shards: " + shards.size());
|
LOG.debug("Num shards: " + shards.size());
|
||||||
|
|
||||||
Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
|
Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
|
||||||
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
|
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
|
||||||
assertAllParentShardsAreClosed(shardIdToChildShardIdsMap, shardIdToShardMap);
|
Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
|
||||||
|
if (!ignoreUnexpectedChildShards) {
|
||||||
|
assertAllParentShardsAreClosed(inconsistentShardIds);
|
||||||
|
}
|
||||||
|
|
||||||
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
|
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
|
||||||
|
|
||||||
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition);
|
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
|
||||||
|
inconsistentShardIds);
|
||||||
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
|
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
|
||||||
for (KinesisClientLease lease : newLeasesToCreate) {
|
for (KinesisClientLease lease : newLeasesToCreate) {
|
||||||
long startTimeMillis = System.currentTimeMillis();
|
long startTimeMillis = System.currentTimeMillis();
|
||||||
|
|
@ -149,19 +162,37 @@ class ShardSyncer {
|
||||||
|
|
||||||
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
|
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
|
||||||
* and a reshard operation.
|
* and a reshard operation.
|
||||||
* @param shardIdToChildShardIdsMap
|
* @param inconsistentShardIds
|
||||||
* @param shardIdToShardMap
|
|
||||||
* @throws KinesisClientLibIOException
|
* @throws KinesisClientLibIOException
|
||||||
*/
|
*/
|
||||||
private static void assertAllParentShardsAreClosed(Map<String, Set<String>> shardIdToChildShardIdsMap,
|
private static void assertAllParentShardsAreClosed(Set<String> inconsistentShardIds)
|
||||||
Map<String, Shard> shardIdToShardMap) throws KinesisClientLibIOException {
|
throws KinesisClientLibIOException {
|
||||||
|
if (!inconsistentShardIds.isEmpty()) {
|
||||||
|
String ids = StringUtils.join(inconsistentShardIds, ' ');
|
||||||
|
throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. "
|
||||||
|
+ "This can happen due to a race condition between describeStream and a reshard operation.",
|
||||||
|
inconsistentShardIds.size(), ids));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor
|
||||||
|
* parent(s).
|
||||||
|
* @param shardIdToChildShardIdsMap
|
||||||
|
* @param shardIdToShardMap
|
||||||
|
* @return Set of inconsistent open shard ids for shards having open parents.
|
||||||
|
*/
|
||||||
|
private static Set<String> findInconsistentShardIds(Map<String, Set<String>> shardIdToChildShardIdsMap,
|
||||||
|
Map<String, Shard> shardIdToShardMap) {
|
||||||
|
Set<String> result = new HashSet<String>();
|
||||||
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
|
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
|
||||||
Shard parentShard = shardIdToShardMap.get(parentShardId);
|
Shard parentShard = shardIdToShardMap.get(parentShardId);
|
||||||
if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) {
|
if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) {
|
||||||
throw new KinesisClientLibIOException("Parent shardId " + parentShardId + " is not closed. "
|
Set<String> childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId);
|
||||||
+ "This can happen due to a race condition between describeStream and a reshard operation.");
|
result.addAll(childShardIdsMap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -296,8 +327,8 @@ class ShardSyncer {
|
||||||
/**
|
/**
|
||||||
* Determine new leases to create and their initial checkpoint.
|
* Determine new leases to create and their initial checkpoint.
|
||||||
* Note: Package level access only for testing purposes.
|
* Note: Package level access only for testing purposes.
|
||||||
*
|
*
|
||||||
* For each open (no ending sequence number) shard that doesn't already have a lease,
|
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
|
||||||
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
|
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
|
||||||
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
|
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
|
||||||
* If not, set checkpoint of the shard to the initial position specified by the client.
|
* If not, set checkpoint of the shard to the initial position specified by the client.
|
||||||
|
|
@ -315,27 +346,35 @@ class ShardSyncer {
|
||||||
*
|
*
|
||||||
* For example:
|
* For example:
|
||||||
* Shard structure (each level depicts a stream segment):
|
* Shard structure (each level depicts a stream segment):
|
||||||
* 0 1 2 3 4 5- shards till epoch 102
|
* 0 1 2 3 4 5 - shards till epoch 102
|
||||||
* \ / \ / | |
|
* \ / \ / | |
|
||||||
* 6 7 4 5- shards from epoch 103 - 205
|
* 6 7 4 5 - shards from epoch 103 - 205
|
||||||
* \ / | /\
|
* \ / | / \
|
||||||
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
||||||
* Current leases: (3, 4, 5)
|
* Current leases: (3, 4, 5)
|
||||||
* New leases to create: (2, 6, 7, 8, 9, 10)
|
* New leases to create: (2, 6, 7, 8, 9, 10)
|
||||||
*
|
*
|
||||||
* The leases returned are sorted by the starting sequence number - following the same order
|
* The leases returned are sorted by the starting sequence number - following the same order
|
||||||
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
|
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
|
||||||
* before creating all the leases.
|
* before creating all the leases.
|
||||||
|
*
|
||||||
|
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
|
||||||
|
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
|
||||||
|
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
|
||||||
|
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
|
||||||
|
*
|
||||||
*
|
*
|
||||||
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
|
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
|
||||||
* @param currentLeases List of current leases
|
* @param currentLeases List of current leases
|
||||||
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
|
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
|
||||||
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
|
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
|
||||||
|
* @param inconsistentShardIds Set of child shard ids having open parents.
|
||||||
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
|
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
|
||||||
*/
|
*/
|
||||||
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
|
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
|
||||||
List<KinesisClientLease> currentLeases,
|
List<KinesisClientLease> currentLeases,
|
||||||
InitialPositionInStreamExtended initialPosition) {
|
InitialPositionInStreamExtended initialPosition,
|
||||||
|
Set<String> inconsistentShardIds) {
|
||||||
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
|
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
|
||||||
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
|
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
|
||||||
|
|
||||||
|
|
@ -354,6 +393,8 @@ class ShardSyncer {
|
||||||
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
|
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
|
||||||
if (shardIdsOfCurrentLeases.contains(shardId)) {
|
if (shardIdsOfCurrentLeases.contains(shardId)) {
|
||||||
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
|
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
|
||||||
|
} else if (inconsistentShardIds.contains(shardId)) {
|
||||||
|
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Need to create a lease for shardId " + shardId);
|
LOG.debug("Need to create a lease for shardId " + shardId);
|
||||||
KinesisClientLease newLease = newKCLLease(shard);
|
KinesisClientLease newLease = newKCLLease(shard);
|
||||||
|
|
@ -407,6 +448,17 @@ class ShardSyncer {
|
||||||
return newLeasesToCreate;
|
return newLeasesToCreate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine new leases to create and their initial checkpoint.
|
||||||
|
* Note: Package level access only for testing purposes.
|
||||||
|
*/
|
||||||
|
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
|
||||||
|
List<KinesisClientLease> currentLeases,
|
||||||
|
InitialPositionInStreamExtended initialPosition) {
|
||||||
|
Set<String> inconsistentShardIds = new HashSet<String>();
|
||||||
|
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Note: Package level access for testing purposes only.
|
* Note: Package level access for testing purposes only.
|
||||||
* Check if this shard is a descendant of a shard that is (or will be) processed.
|
* Check if this shard is a descendant of a shard that is (or will be) processed.
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ class ShutdownTask implements ITask {
|
||||||
private final ILeaseManager<KinesisClientLease> leaseManager;
|
private final ILeaseManager<KinesisClientLease> leaseManager;
|
||||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||||
private final boolean cleanupLeasesOfCompletedShards;
|
private final boolean cleanupLeasesOfCompletedShards;
|
||||||
|
private final boolean ignoreUnexpectedChildShards;
|
||||||
private final TaskType taskType = TaskType.SHUTDOWN;
|
private final TaskType taskType = TaskType.SHUTDOWN;
|
||||||
private final long backoffTimeMillis;
|
private final long backoffTimeMillis;
|
||||||
private final GetRecordsCache getRecordsCache;
|
private final GetRecordsCache getRecordsCache;
|
||||||
|
|
@ -59,6 +60,7 @@ class ShutdownTask implements ITask {
|
||||||
IKinesisProxy kinesisProxy,
|
IKinesisProxy kinesisProxy,
|
||||||
InitialPositionInStreamExtended initialPositionInStream,
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
boolean cleanupLeasesOfCompletedShards,
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards,
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
long backoffTimeMillis,
|
long backoffTimeMillis,
|
||||||
GetRecordsCache getRecordsCache) {
|
GetRecordsCache getRecordsCache) {
|
||||||
|
|
@ -69,6 +71,7 @@ class ShutdownTask implements ITask {
|
||||||
this.kinesisProxy = kinesisProxy;
|
this.kinesisProxy = kinesisProxy;
|
||||||
this.initialPositionInStream = initialPositionInStream;
|
this.initialPositionInStream = initialPositionInStream;
|
||||||
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
this.leaseManager = leaseManager;
|
this.leaseManager = leaseManager;
|
||||||
this.backoffTimeMillis = backoffTimeMillis;
|
this.backoffTimeMillis = backoffTimeMillis;
|
||||||
this.getRecordsCache = getRecordsCache;
|
this.getRecordsCache = getRecordsCache;
|
||||||
|
|
@ -127,7 +130,8 @@ class ShutdownTask implements ITask {
|
||||||
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
||||||
leaseManager,
|
leaseManager,
|
||||||
initialPositionInStream,
|
initialPositionInStream,
|
||||||
cleanupLeasesOfCompletedShards);
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards);
|
||||||
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
|
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -407,8 +407,8 @@ public class Worker implements Runnable {
|
||||||
this.leaseCoordinator = leaseCoordinator;
|
this.leaseCoordinator = leaseCoordinator;
|
||||||
this.metricsFactory = metricsFactory;
|
this.metricsFactory = metricsFactory;
|
||||||
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
||||||
initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
|
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
|
||||||
executorService);
|
shardSyncIdleTimeMillis, metricsFactory, executorService);
|
||||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||||
this.failoverTimeMillis = failoverTimeMillis;
|
this.failoverTimeMillis = failoverTimeMillis;
|
||||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
|
|
@ -499,7 +499,8 @@ public class Worker implements Runnable {
|
||||||
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
|
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
|
||||||
LOG.info("Syncing Kinesis shard info");
|
LOG.info("Syncing Kinesis shard info");
|
||||||
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
|
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
|
||||||
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L);
|
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
|
||||||
|
config.shouldIgnoreUnexpectedChildShards(), 0L);
|
||||||
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
|
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
@ -407,4 +408,14 @@ public class KinesisClientLibConfigurationTest {
|
||||||
fail("Should not have thrown");
|
fail("Should not have thrown");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKCLConfigurationWithIgnoreUnexpectedChildShards() {
|
||||||
|
KinesisClientLibConfiguration config =
|
||||||
|
new KinesisClientLibConfiguration("TestApplication", "TestStream", null, "TestWorker");
|
||||||
|
// By default, unexpected child shards should not be ignored.
|
||||||
|
assertFalse(config.shouldIgnoreUnexpectedChildShards());
|
||||||
|
config = config.withIgnoreUnexpectedChildShards(true);
|
||||||
|
assertTrue(config.shouldIgnoreUnexpectedChildShards());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -124,6 +124,7 @@ public class ShardSyncTaskIntegrationTest {
|
||||||
leaseManager,
|
leaseManager,
|
||||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
|
||||||
false,
|
false,
|
||||||
|
false,
|
||||||
0L);
|
0L);
|
||||||
syncTask.call();
|
syncTask.call();
|
||||||
List<KinesisClientLease> leases = leaseManager.listLeases();
|
List<KinesisClientLease> leases = leaseManager.listLeases();
|
||||||
|
|
|
||||||
|
|
@ -146,6 +146,39 @@ public class ShardSyncerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but one of
|
||||||
|
* the shards was marked as inconsistent.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public final void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() {
|
||||||
|
List<Shard> shards = new ArrayList<Shard>();
|
||||||
|
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
|
||||||
|
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
|
||||||
|
|
||||||
|
String shardId0 = "shardId-0";
|
||||||
|
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange));
|
||||||
|
|
||||||
|
String shardId1 = "shardId-1";
|
||||||
|
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
|
||||||
|
|
||||||
|
String shardId2 = "shardId-2";
|
||||||
|
shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
|
||||||
|
|
||||||
|
Set<String> inconsistentShardIds = new HashSet<String>();
|
||||||
|
inconsistentShardIds.add(shardId2);
|
||||||
|
|
||||||
|
List<KinesisClientLease> newLeases =
|
||||||
|
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds);
|
||||||
|
Assert.assertEquals(2, newLeases.size());
|
||||||
|
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||||
|
expectedLeaseShardIds.add(shardId0);
|
||||||
|
expectedLeaseShardIds.add(shardId1);
|
||||||
|
for (KinesisClientLease lease : newLeases) {
|
||||||
|
Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
|
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
|
||||||
*
|
*
|
||||||
|
|
@ -296,6 +329,41 @@ public class ShardSyncerTest {
|
||||||
dataFile.delete();
|
dataFile.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren()
|
||||||
|
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||||
|
IOException {
|
||||||
|
List<Shard> shards = constructShardListForGraphA();
|
||||||
|
Shard shard = shards.get(5);
|
||||||
|
Assert.assertEquals("shardId-5", shard.getShardId());
|
||||||
|
SequenceNumberRange range = shard.getSequenceNumberRange();
|
||||||
|
// shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5
|
||||||
|
// is not closed, those children should be ignored when syncing shards, no leases
|
||||||
|
// should be obtained for them, and we should obtain a lease on the still-open
|
||||||
|
// parent.
|
||||||
|
range.setEndingSequenceNumber(null);
|
||||||
|
shard.setSequenceNumberRange(range);
|
||||||
|
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
|
||||||
|
dataFile.deleteOnExit();
|
||||||
|
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
|
||||||
|
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
|
||||||
|
cleanupLeasesOfCompletedShards, true);
|
||||||
|
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
||||||
|
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||||
|
expectedLeaseShardIds.add("shardId-4");
|
||||||
|
expectedLeaseShardIds.add("shardId-5");
|
||||||
|
expectedLeaseShardIds.add("shardId-8");
|
||||||
|
Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size());
|
||||||
|
for (KinesisClientLease lease1 : newLeases) {
|
||||||
|
Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey()));
|
||||||
|
Assert.assertEquals(ExtendedSequenceNumber.LATEST, lease1.getCheckpoint());
|
||||||
|
}
|
||||||
|
dataFile.delete();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws KinesisClientLibIOException
|
* @throws KinesisClientLibIOException
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
|
|
@ -586,7 +654,8 @@ public class ShardSyncerTest {
|
||||||
dataFile.deleteOnExit();
|
dataFile.deleteOnExit();
|
||||||
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
|
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
|
||||||
|
|
||||||
ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards);
|
ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
|
||||||
|
false);
|
||||||
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
||||||
Assert.assertEquals(2, newLeases.size());
|
Assert.assertEquals(2, newLeases.size());
|
||||||
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||||
|
|
|
||||||
|
|
@ -100,6 +100,7 @@ public class ShutdownTaskTest {
|
||||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||||
boolean cleanupLeasesOfCompletedShards = false;
|
boolean cleanupLeasesOfCompletedShards = false;
|
||||||
|
boolean ignoreUnexpectedChildShards = false;
|
||||||
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||||
defaultRecordProcessor,
|
defaultRecordProcessor,
|
||||||
checkpointer,
|
checkpointer,
|
||||||
|
|
@ -107,6 +108,7 @@ public class ShutdownTaskTest {
|
||||||
kinesisProxy,
|
kinesisProxy,
|
||||||
INITIAL_POSITION_TRIM_HORIZON,
|
INITIAL_POSITION_TRIM_HORIZON,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
leaseManager,
|
leaseManager,
|
||||||
TASK_BACKOFF_TIME_MILLIS,
|
TASK_BACKOFF_TIME_MILLIS,
|
||||||
getRecordsCache);
|
getRecordsCache);
|
||||||
|
|
@ -126,6 +128,7 @@ public class ShutdownTaskTest {
|
||||||
when(kinesisProxy.getShardList()).thenReturn(null);
|
when(kinesisProxy.getShardList()).thenReturn(null);
|
||||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||||
boolean cleanupLeasesOfCompletedShards = false;
|
boolean cleanupLeasesOfCompletedShards = false;
|
||||||
|
boolean ignoreUnexpectedChildShards = false;
|
||||||
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||||
defaultRecordProcessor,
|
defaultRecordProcessor,
|
||||||
checkpointer,
|
checkpointer,
|
||||||
|
|
@ -133,6 +136,7 @@ public class ShutdownTaskTest {
|
||||||
kinesisProxy,
|
kinesisProxy,
|
||||||
INITIAL_POSITION_TRIM_HORIZON,
|
INITIAL_POSITION_TRIM_HORIZON,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
leaseManager,
|
leaseManager,
|
||||||
TASK_BACKOFF_TIME_MILLIS,
|
TASK_BACKOFF_TIME_MILLIS,
|
||||||
getRecordsCache);
|
getRecordsCache);
|
||||||
|
|
@ -147,7 +151,7 @@ public class ShutdownTaskTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testGetTaskType() {
|
public final void testGetTaskType() {
|
||||||
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsCache);
|
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache);
|
||||||
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
|
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue