Merge remote-tracking branch 'ashwin/ltr-v1.x' into periodicShardSyncMetric

This commit is contained in:
Chunxue Yang 2020-07-07 17:17:50 -07:00
commit 5ba87a34ab
20 changed files with 1725 additions and 460 deletions

View file

@ -530,7 +530,8 @@ class ConsumerStates {
consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseCoordinator(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy());
consumer.getGetRecordsCache(), consumer.getShardSyncer(),
consumer.getShardSyncStrategy(), consumer.getChildShards());
}
@Override

View file

@ -16,7 +16,12 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.model.ChildShard;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -47,6 +52,7 @@ class KinesisDataFetcher {
private boolean isInitialized;
private String lastKnownSequenceNumber;
private InitialPositionInStreamExtended initialPositionInStream;
private List<ChildShard> childShards = Collections.emptyList();
/**
*
@ -85,8 +91,11 @@ class KinesisDataFetcher {
final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
@Override
public GetRecordsResult getResult() {
return new GetRecordsResult().withMillisBehindLatest(null).withRecords(Collections.emptyList())
.withNextShardIterator(null);
return new GetRecordsResult()
.withMillisBehindLatest(null)
.withRecords(Collections.emptyList())
.withNextShardIterator(null)
.withChildShards(Collections.emptyList());
}
@Override
@ -113,12 +122,20 @@ class KinesisDataFetcher {
@Override
public GetRecordsResult accept() {
if (!isValidResult(result)) {
// Throwing SDK exception when the GetRecords result is not valid. This will allow PrefetchGetRecordsCache to retry the GetRecords call.
throw new SdkClientException("Shard " + shardId +": GetRecordsResult is not valid. NextShardIterator: " + result.getNextShardIterator()
+ ". ChildShards: " + result.getChildShards());
}
nextIterator = result.getNextShardIterator();
if (!CollectionUtils.isNullOrEmpty(result.getRecords())) {
lastKnownSequenceNumber = Iterables.getLast(result.getRecords()).getSequenceNumber();
}
if (nextIterator == null) {
LOG.info("Reached shard end: nextIterator is null in AdvancingResult.accept for shard " + shardId);
LOG.info("Reached shard end: nextIterator is null in AdvancingResult.accept for shard " + shardId + ". childShards: " + result.getChildShards());
if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) {
childShards = result.getChildShards();
}
isShardEndReached = true;
}
return getResult();
@ -130,6 +147,23 @@ class KinesisDataFetcher {
}
}
private boolean isValidResult(GetRecordsResult getRecordsResult) {
// GetRecords result should contain childShard information. There are two valid combination for the nextShardIterator and childShards
// If the GetRecords call does not reach the shard end, getRecords result should contain a non-null nextShardIterator and an empty list of childShards.
// If the GetRecords call reaches the shard end, getRecords result should contain a null nextShardIterator and a non-empty list of childShards.
// All other combinations are invalid and indicating an issue with GetRecords result from Kinesis service.
if (getRecordsResult.getNextShardIterator() == null && CollectionUtils.isNullOrEmpty(getRecordsResult.getChildShards()) ||
getRecordsResult.getNextShardIterator() != null && !CollectionUtils.isNullOrEmpty(getRecordsResult.getChildShards())) {
return false;
}
for (ChildShard childShard : getRecordsResult.getChildShards()) {
if (CollectionUtils.isNullOrEmpty(childShard.getParentShards())) {
return false;
}
}
return true;
}
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
* @param initialCheckpoint Current checkpoint sequence number for this shard.
@ -141,8 +175,7 @@ class KinesisDataFetcher {
isInitialized = true;
}
public void initialize(ExtendedSequenceNumber initialCheckpoint,
InitialPositionInStreamExtended initialPositionInStream) {
public void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream) {
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint.getSequenceNumber());
advanceIteratorTo(initialCheckpoint.getSequenceNumber(), initialPositionInStream);
isInitialized = true;
@ -171,6 +204,7 @@ class KinesisDataFetcher {
if (nextIterator == null) {
LOG.info("Reached shard end: cannot advance iterator for shard " + shardId);
isShardEndReached = true;
// TODO: transition to ShuttingDown state on shardend instead to shutdown state for enqueueing this for cleanup
}
this.lastKnownSequenceNumber = sequenceNumber;
this.initialPositionInStream = initialPositionInStream;
@ -248,6 +282,10 @@ class KinesisDataFetcher {
return isShardEndReached;
}
protected List<ChildShard> getChildShards() {
return childShards;
}
/** Note: This method has package level access for testing purposes.
* @return nextIterator
*/

View file

@ -28,9 +28,11 @@ import java.util.Set;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType;
import com.amazonaws.util.CollectionUtils;
import lombok.NoArgsConstructor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang3.StringUtils;
@ -457,7 +459,7 @@ class KinesisShardSyncer implements ShardSyncer {
/**
* Note: Package level access for testing purposes only.
* Check if this shard is a descendant of a shard that is (or will be) processed.
* Create leases for the ancestors of this shard as required.
* Create leases for the first ancestor of this shard that needs to be processed, as required.
* See javadoc of determineNewLeasesToCreate() for rules and example.
*
* @param shardId The shardId to check.
@ -473,9 +475,10 @@ class KinesisShardSyncer implements ShardSyncer {
static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
InitialPositionInStreamExtended initialPosition, Set<String> shardIdsOfCurrentLeases,
Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards, Map<String, Boolean> memoizationContext) {
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards, MemoizationContext memoizationContext) {
final Boolean previousValue = memoizationContext.isDescendant(shardId);
Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) {
return previousValue;
}
@ -495,10 +498,13 @@ class KinesisShardSyncer implements ShardSyncer {
shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards);
for (String parentShardId : parentShardIds) {
// Check if the parent is a descendant, and include its ancestors.
if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards,
memoizationContext)) {
// Check if the parent is a descendant, and include its ancestors. Or, if the parent is NOT a
// descendant but we should create a lease for it anyway (e.g. to include in processing from
// TRIM_HORIZON or AT_TIMESTAMP). If either is true, then we mark the current shard as a descendant.
final boolean isParentDescendant = checkIfDescendantAndAddNewLeasesForAncestors(parentShardId,
initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards,
shardIdToLeaseMapOfNewShards, memoizationContext);
if (isParentDescendant || memoizationContext.shouldCreateLease(parentShardId)) {
isDescendant = true;
descendantParentShardIds.add(parentShardId);
LOG.debug("Parent shard " + parentShardId + " is a descendant.");
@ -511,37 +517,76 @@ class KinesisShardSyncer implements ShardSyncer {
if (isDescendant) {
for (String parentShardId : parentShardIds) {
if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
LOG.debug("Need to create a lease for shardId " + parentShardId);
KinesisClientLease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
// If the lease for the parent shard does not already exist, there are two cases in which we
// would want to create it:
// - If we have already marked the parentShardId for lease creation in a prior recursive
// call. This could happen if we are trying to process from TRIM_HORIZON or AT_TIMESTAMP.
// - If the parent shard is not a descendant but the current shard is a descendant, then
// the parent shard is the oldest shard in the shard hierarchy that does not have an
// ancestor in the lease table (the adjacent parent is necessarily a descendant, and
// therefore covered in the lease table). So we should create a lease for the parent.
if (lease == null) {
lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
if (memoizationContext.shouldCreateLease(parentShardId) ||
!descendantParentShardIds.contains(parentShardId)) {
LOG.debug("Need to create a lease for shardId " + parentShardId);
lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
}
}
if (descendantParentShardIds.contains(parentShardId) && !initialPosition
.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.setCheckpoint(convertToCheckpoint(initialPosition));
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will
* add a lease just like we do for TRIM_HORIZON. However we will only return back records
* with server-side timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: (4, 5, 7)
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards 0 and 1 (with
* checkpoint set AT_TIMESTAMP), even though these ancestor shards have an epoch less than
* 206. However as we begin processing the ancestor shards, their checkpoints would be
* updated to SHARD_END and their leases would then be deleted since they won't have records
* with server-side timestamp at/after 206. And after that we will begin processing the
* descendant shards with epoch at/after 206 and we will return the records that meet the
* timestamp requirement for these shards.
*/
if (lease != null) {
if (descendantParentShardIds.contains(parentShardId) && !initialPosition
.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.setCheckpoint(convertToCheckpoint(initialPosition));
}
}
}
}
} else {
// This shard should be included, if the customer wants to process all records in the stream or
// if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do
// for TRIM_HORIZON. However we will only return back records with server-side timestamp at or
// after the specified initial position timestamp.
// This shard is not a descendant, but should still be included if the customer wants to process all
// records in the stream or if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a
// lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
// timestamp at or after the specified initial position timestamp.
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|| initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
isDescendant = true;
memoizationContext.setShouldCreateLease(shardId, true);
}
}
}
}
memoizationContext.put(shardId, isDescendant);
memoizationContext.setIsDescendant(shardId, isDescendant);
return isDescendant;
}
// CHECKSTYLE:ON CyclomaticComplexity
@ -735,6 +780,29 @@ class KinesisShardSyncer implements ShardSyncer {
return newLease;
}
/**
* Helper method to create a new KinesisClientLease POJO for a ChildShard.
* Note: Package level access only for testing purposes
*
* @param childShard
* @return
*/
static KinesisClientLease newKCLLeaseForChildShard(ChildShard childShard) throws InvalidStateException {
final KinesisClientLease newLease = new KinesisClientLease();
newLease.setLeaseKey(childShard.getShardId());
final List<String> parentShardIds = new ArrayList<>();
if (!CollectionUtils.isNullOrEmpty(childShard.getParentShards())) {
parentShardIds.addAll(childShard.getParentShards());
} else {
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.getShardId()
+ " because parent shards cannot be found.");
}
newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L);
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return newLease;
}
/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
*
@ -834,4 +902,28 @@ class KinesisShardSyncer implements ShardSyncer {
}
/**
* Helper class to pass around state between recursive traversals of shard hierarchy.
*/
@NoArgsConstructor
static class MemoizationContext {
private Map<String, Boolean> isDescendantMap = new HashMap<>();
private Map<String, Boolean> shouldCreateLeaseMap = new HashMap<>();
Boolean isDescendant(String shardId) {
return isDescendantMap.get(shardId);
}
void setIsDescendant(String shardId, Boolean isDescendant) {
isDescendantMap.put(shardId, isDescendant);
}
Boolean shouldCreateLease(String shardId) {
return shouldCreateLeaseMap.computeIfAbsent(shardId, x -> Boolean.FALSE);
}
void setShouldCreateLease(String shardId, Boolean shouldCreateLease) {
shouldCreateLeaseMap.put(shardId, shouldCreateLease);
}
}
}

View file

@ -47,8 +47,10 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
* Note: Package level access only for testing purposes.
*
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* determine if it is a descendant of any shard which is or will be processed (e.g. for which a lease exists):
* If so, create a lease for the first ancestor that needs to be processed (if needed). We will create leases
* for no more than one level in the ancestry tree. Once we find the first ancestor that needs to be processed,
* we will avoid creating leases for further descendants of that ancestor.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
@ -67,10 +69,17 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: (4, 5, 7)
*
* If initial position is LATEST:
* - New leases to create: (6)
* If initial position is TRIM_HORIZON:
* - New leases to create: (0, 1)
* If initial position is AT_TIMESTAMP(epoch=200):
* - New leases to create: (0, 1)
*
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
@ -104,7 +113,8 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
}
List<Shard> openShards = KinesisShardSyncer.getOpenShards(shards);
Map<String, Boolean> memoizationContext = new HashMap<>();
final KinesisShardSyncer.MemoizationContext memoizationContext = new KinesisShardSyncer.MemoizationContext();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
@ -115,43 +125,30 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
} else if (inconsistentShardIds.contains(shardId)) {
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
} else {
LOG.debug("Need to create a lease for shardId " + shardId);
KinesisClientLease newLease = KinesisShardSyncer.newKCLLease(shard);
LOG.debug("Beginning traversal of ancestry tree for shardId " + shardId);
// A shard is a descendant if at least one if its ancestors exists in the lease table.
// We will create leases for only one level in the ancestry tree. Once we find the first ancestor
// that needs to be processed in order to complete the hash range, we will not create leases for
// further descendants of that ancestor.
boolean isDescendant = KinesisShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards,
shardIdToNewLeaseMap, memoizationContext);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant && !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
// If shard is a descendant, the leases for its ancestors were already created above. Open shards
// that are NOT descendants will not have leases yet, so we create them here. We will not create
// leases for open shards that ARE descendants yet - leases for these shards will be created upon
// SHARD_END of their parents.
if (!isDescendant) {
LOG.debug("ShardId " + shardId + " has no ancestors. Creating a lease.");
final KinesisClientLease newLease = KinesisShardSyncer.newKCLLease(shard);
newLease.setCheckpoint(KinesisShardSyncer.convertToCheckpoint(initialPosition));
LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
} else {
LOG.debug("ShardId " + shardId + " is a descendant whose ancestors should already have leases. " +
"Not creating a lease.");
}
LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}

View file

@ -129,6 +129,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
try {
result = getRecordsResultQueue.take().withCacheExitTime(Instant.now());
prefetchCounters.removed(result);
log.info("Shard " + shardId + ": Number of records remaining in queue is " + getRecordsResultQueue.size());
} catch (InterruptedException e) {
log.error("Interrupted while getting records from the cache", e);
}
@ -177,7 +178,6 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count,
MetricsLevel.SUMMARY);
dataFetcher.restartIterator();
} catch (SdkClientException e) {
log.error("Exception thrown while fetching records from Kinesis", e);

View file

@ -152,8 +152,8 @@ class ProcessTask implements ITask {
try {
if (dataFetcher.isShardEndReached()) {
LOG.info("Reached end of shard " + shardInfo.getShardId());
return new TaskResult(null, true);
LOG.info("Reached end of shard " + shardInfo.getShardId() + ". Found childShards: " + dataFetcher.getChildShards());
return new TaskResult(null, true, dataFetcher.getChildShards());
}
final ProcessRecordsInput processRecordsInput = getRecordsResult();
@ -353,7 +353,7 @@ class ProcessTask implements ITask {
* recordProcessorCheckpointer).
*/
dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue()
.getSequenceNumber(), streamConfig.getInitialPositionInStream());
.getSequenceNumber(), streamConfig.getInitialPositionInStream());
// Try a second time - if we fail this time, expose the failure.
try {

View file

@ -15,11 +15,14 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -66,6 +69,9 @@ class ShardConsumer {
private Future<TaskResult> future;
private ShardSyncStrategy shardSyncStrategy;
@Getter
private List<ChildShard> childShards;
@Getter
private final GetRecordsCache getRecordsCache;
@ -321,6 +327,10 @@ class ShardConsumer {
TaskResult result = future.get();
if (result.getException() == null) {
if (result.isShardEndReached()) {
if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) {
childShards = result.getChildShards();
LOG.info("Shard " + shardInfo.getShardId() + ": Setting childShards in ShardConsumer: " + childShards);
}
return TaskOutcome.END_OF_SHARD;
}
return TaskOutcome.SUCCESSFUL;
@ -420,6 +430,7 @@ class ShardConsumer {
void updateState(TaskOutcome taskOutcome) {
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
markForShutdown(ShutdownReason.TERMINATE);
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE");
}
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
currentState = currentState.shutdownTransition(shutdownReason);

View file

@ -14,9 +14,11 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -30,6 +32,9 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Task for invoking the RecordProcessor shutdown() callback.
@ -54,6 +59,7 @@ class ShutdownTask implements ITask {
private final GetRecordsCache getRecordsCache;
private final ShardSyncer shardSyncer;
private final ShardSyncStrategy shardSyncStrategy;
private final List<ChildShard> childShards;
/**
* Constructor.
@ -69,7 +75,8 @@ class ShutdownTask implements ITask {
boolean ignoreUnexpectedChildShards,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long backoffTimeMillis,
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer,
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -83,6 +90,7 @@ class ShutdownTask implements ITask {
this.getRecordsCache = getRecordsCache;
this.shardSyncer = shardSyncer;
this.shardSyncStrategy = shardSyncStrategy;
this.childShards = childShards;
}
/*
@ -97,29 +105,39 @@ class ShutdownTask implements ITask {
boolean applicationException = false;
try {
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: "
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards);
ShutdownReason localReason = reason;
List<Shard> latestShards = null;
/*
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active
* workers to contend for the lease of this shard.
*/
if(localReason == ShutdownReason.TERMINATE) {
ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId());
if (shardClosureVerificationResponse instanceof ShardListWrappingShardClosureVerificationResponse) {
latestShards = ((ShardListWrappingShardClosureVerificationResponse)shardClosureVerificationResponse).getLatestShards();
}
// If shard in context is not closed yet we should shut down the ShardConsumer with Zombie state
// which avoids checkpoint-ing with SHARD_END sequence number.
if(!shardClosureVerificationResponse.isShardClosed()) {
// Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher catches ResourceNotFound exception.
// In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
try {
if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist();
updateCurrentLeaseWithChildShards();
} else {
LOG.warn("Shard " + shardInfo.getShardId()
+ ": Shutting down consumer with SHARD_END reason without creating leases for child shards.");
}
} catch (InvalidStateException e) {
// If invalidStateException happens, it indicates we are missing childShard related information.
// In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting
// childShard information in the processTask.
localReason = ShutdownReason.ZOMBIE;
dropLease();
LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId());
LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " +
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e);
}
}
// If we reached end of the shard, set sequence number to SHARD_END.
if (localReason == ShutdownReason.TERMINATE) {
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
@ -127,8 +145,6 @@ class ShutdownTask implements ITask {
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
}
LOG.debug("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken "
+ shardInfo.getConcurrencyToken() + ". Shutdown reason: " + localReason);
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(localReason)
.withCheckpointer(recordProcessorCheckpointer);
@ -156,18 +172,6 @@ class ShutdownTask implements ITask {
MetricsLevel.SUMMARY);
}
if (localReason == ShutdownReason.TERMINATE) {
LOG.debug("Looking for child shards of shard " + shardInfo.getShardId());
// create leases for the child shards
TaskResult result = shardSyncStrategy.onShardConsumerShutDown(latestShards);
if (result.getException() != null) {
LOG.debug("Exception while trying to sync shards on the shutdown of shard: " + shardInfo
.getShardId());
throw result.getException();
}
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
}
return new TaskResult(null);
} catch (Exception e) {
if (applicationException) {
@ -187,6 +191,33 @@ class ShutdownTask implements ITask {
return new TaskResult(exception);
}
private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException {
for (ChildShard childShard : childShards) {
final String leaseKey = childShard.getShardId();
if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) {
final KinesisClientLease leaseToCreate = KinesisShardSyncer.newKCLLeaseForChildShard(childShard);
leaseCoordinator.getLeaseManager().createLeaseIfNotExists(leaseToCreate);
LOG.info("Shard " + shardInfo.getShardId() + " : Created child shard lease: " + leaseToCreate.getLeaseKey());
}
}
}
private void updateCurrentLeaseWithChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
if (currentLease == null) {
throw new InvalidStateException("Failed to retrieve current lease for shard " + shardInfo.getShardId());
}
final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet());
currentLease.setChildShardIds(childShardIds);
final boolean updateResult = leaseCoordinator.updateLease(currentLease, UUID.fromString(shardInfo.getConcurrencyToken()));
if (!updateResult) {
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.getShardId());
}
LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey());
}
/*
* (non-Javadoc)
*
@ -204,6 +235,10 @@ class ShutdownTask implements ITask {
private void dropLease() {
KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
if (lease == null) {
LOG.warn("Shard " + shardInfo.getShardId() + ": Lease already dropped. Will shutdown the shardConsumer directly.");
return;
}
leaseCoordinator.dropLease(lease);
LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey());
}

View file

@ -14,6 +14,10 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.ChildShard;
import java.util.List;
/**
* Used to capture information from a task that we want to communicate back to the higher layer.
* E.g. exception thrown when executing the task, if we reach end of a shard.
@ -26,6 +30,9 @@ class TaskResult {
// Any exception caught while executing the task.
private Exception exception;
// List of childShards of the current shard. This field is only required for the task result when we reach end of a shard.
private List<ChildShard> childShards;
/**
* @return the shardEndReached
*/
@ -33,6 +40,11 @@ class TaskResult {
return shardEndReached;
}
/**
* @return the list of childShards.
*/
protected List<ChildShard> getChildShards() { return childShards; }
/**
* @param shardEndReached the shardEndReached to set
*/
@ -40,6 +52,11 @@ class TaskResult {
this.shardEndReached = shardEndReached;
}
/**
* @param childShards the list of childShards to set
*/
protected void setChildShards(List<ChildShard> childShards) { this.childShards = childShards; }
/**
* @return the exception
*/
@ -70,4 +87,10 @@ class TaskResult {
this.shardEndReached = isShardEndReached;
}
TaskResult(Exception e, boolean isShardEndReached, List<ChildShard> childShards) {
this.exception = e;
this.shardEndReached = isShardEndReached;
this.childShards = childShards;
}
}

View file

@ -30,9 +30,10 @@ public class KinesisClientLease extends Lease {
private ExtendedSequenceNumber pendingCheckpoint;
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<String>();
private Set<String> childShardIds = new HashSet<String>();
private Set<String> childShardIds = new HashSet<>();
private HashKeyRangeForLease hashKeyRangeForLease;
public KinesisClientLease() {
}
@ -43,7 +44,7 @@ public class KinesisClientLease extends Lease {
this.pendingCheckpoint = other.getPendingCheckpoint();
this.ownerSwitchesSinceCheckpoint = other.getOwnerSwitchesSinceCheckpoint();
this.parentShardIds.addAll(other.getParentShardIds());
this.childShardIds = other.getChildShardIds();
this.childShardIds.addAll(other.getChildShardIds());
this.hashKeyRangeForLease = other.getHashKeyRange();
}
@ -76,6 +77,7 @@ public class KinesisClientLease extends Lease {
setCheckpoint(casted.checkpoint);
setPendingCheckpoint(casted.pendingCheckpoint);
setParentShardIds(casted.parentShardIds);
setChildShardIds(casted.childShardIds);
}
/**
@ -108,7 +110,7 @@ public class KinesisClientLease extends Lease {
}
/**
* @return shardIds that are the children of this lease. Used for resharding.
* @return shardIds for the child shards of the current shard. Used for resharding.
*/
public Set<String> getChildShardIds() {
return new HashSet<String>(childShardIds);
@ -170,9 +172,6 @@ public class KinesisClientLease extends Lease {
* @param childShardIds may not be null
*/
public void setChildShardIds(Collection<String> childShardIds) {
verifyNotNull(childShardIds, "childShardIds should not be null");
this.childShardIds.clear();
this.childShardIds.addAll(childShardIds);
}
@ -186,7 +185,7 @@ public class KinesisClientLease extends Lease {
this.hashKeyRangeForLease = hashKeyRangeForLease;
}
private void verifyNotNull(Object object, String message) {
if (object == null) {
throw new IllegalArgumentException(message);

View file

@ -43,7 +43,7 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint";
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
public final String PARENT_SHARD_ID_KEY = "parentShardId";
private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
public final String CHILD_SHARD_IDS_KEY = "childShardIds";
private static final String STARTING_HASH_KEY = "startingHashKey";
private static final String ENDING_HASH_KEY = "endingHashKey";
@ -56,9 +56,12 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
result.put(OWNER_SWITCHES_KEY, DynamoUtils.createAttributeValue(lease.getOwnerSwitchesSinceCheckpoint()));
result.put(CHECKPOINT_SEQUENCE_NUMBER_KEY, DynamoUtils.createAttributeValue(lease.getCheckpoint().getSequenceNumber()));
result.put(CHECKPOINT_SUBSEQUENCE_NUMBER_KEY, DynamoUtils.createAttributeValue(lease.getCheckpoint().getSubSequenceNumber()));
if (lease.getParentShardIds() != null && !lease.getParentShardIds().isEmpty()) {
if (!CollectionUtils.isNullOrEmpty(lease.getParentShardIds())) {
result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.getParentShardIds()));
}
if (!CollectionUtils.isNullOrEmpty(lease.getChildShardIds())) {
result.put(CHILD_SHARD_IDS_KEY, DynamoUtils.createAttributeValue(lease.getChildShardIds()));
}
if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()));
@ -79,6 +82,7 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY))
);
result.setParentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
result.setChildShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_IDS_KEY));
if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) {
result.setPendingCheckpoint(
@ -150,6 +154,9 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
result.put(OWNER_SWITCHES_KEY,
new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getOwnerSwitchesSinceCheckpoint()),
AttributeAction.PUT));
if (!CollectionUtils.isNullOrEmpty(lease.getChildShardIds())) {
result.put(CHILD_SHARD_IDS_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getChildShardIds()), AttributeAction.PUT));
}
if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()), AttributeAction.PUT));

View file

@ -225,7 +225,7 @@ class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease>
@Override
public boolean isLeaseTableEmpty() throws DependencyException,
InvalidStateException, ProvisionedThroughputException {
return false;
return leaseManager.listLeases().isEmpty();
}
}

View file

@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import com.amazonaws.services.kinesis.model.ChildShard;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@ -132,7 +133,7 @@ public class KinesisDataFetcherTest {
}
@Test
public void testadvanceIteratorTo() throws KinesisClientLibException {
public void testadvanceIteratorTo() throws Exception {
IKinesisProxy kinesis = mock(IKinesisProxy.class);
ICheckpoint checkpoint = mock(ICheckpoint.class);
@ -146,9 +147,13 @@ public class KinesisDataFetcherTest {
GetRecordsResult outputA = new GetRecordsResult();
List<Record> recordsA = new ArrayList<Record>();
outputA.setRecords(recordsA);
outputA.setNextShardIterator("nextShardIteratorA");
outputA.setChildShards(Collections.emptyList());
GetRecordsResult outputB = new GetRecordsResult();
List<Record> recordsB = new ArrayList<Record>();
outputB.setRecords(recordsB);
outputB.setNextShardIterator("nextShardIteratorB");
outputB.setChildShards(Collections.emptyList());
when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqA)).thenReturn(iteratorA);
when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqB)).thenReturn(iteratorB);
@ -166,7 +171,7 @@ public class KinesisDataFetcherTest {
}
@Test
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() {
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws Exception{
IKinesisProxy kinesis = mock(IKinesisProxy.class);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
@ -189,7 +194,7 @@ public class KinesisDataFetcherTest {
}
@Test
public void testGetRecordsWithResourceNotFoundException() {
public void testGetRecordsWithResourceNotFoundException() throws Exception {
// Set up arguments used by proxy
String nextIterator = "TestShardIterator";
int maxRecords = 100;
@ -211,11 +216,12 @@ public class KinesisDataFetcherTest {
}
@Test
public void testNonNullGetRecords() {
public void testNonNullGetRecords() throws Exception {
String nextIterator = "TestIterator";
int maxRecords = 100;
KinesisProxy mockProxy = mock(KinesisProxy.class);
when(mockProxy.getIterator(anyString(), anyString())).thenReturn("targetIterator");
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO);
@ -232,17 +238,25 @@ public class KinesisDataFetcherTest {
final String NEXT_ITERATOR_ONE = "NextIteratorOne";
final String NEXT_ITERATOR_TWO = "NextIteratorTwo";
when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR);
GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class);
when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE);
GetRecordsResult iteratorOneResults = new GetRecordsResult();
iteratorOneResults.setNextShardIterator(NEXT_ITERATOR_ONE);
iteratorOneResults.setChildShards(Collections.emptyList());
when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults);
GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class);
GetRecordsResult iteratorTwoResults = new GetRecordsResult();
iteratorTwoResults.setNextShardIterator(NEXT_ITERATOR_TWO);
iteratorTwoResults.setChildShards(Collections.emptyList());
when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults);
when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO);
GetRecordsResult finalResult = mock(GetRecordsResult.class);
GetRecordsResult finalResult = new GetRecordsResult();
finalResult.setNextShardIterator(null);
List<ChildShard> childShards = new ArrayList<>();
ChildShard childShard = new ChildShard();
childShard.setParentShards(Collections.singletonList("parentShardId"));
childShards.add(childShard);
finalResult.setChildShards(childShards);
when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult);
when(finalResult.getNextShardIterator()).thenReturn(null);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
dataFetcher.initialize("TRIM_HORIZON",
@ -276,13 +290,14 @@ public class KinesisDataFetcherTest {
}
@Test
public void testRestartIterator() {
public void testRestartIterator() throws Exception{
GetRecordsResult getRecordsResult = mock(GetRecordsResult.class);
GetRecordsResult restartGetRecordsResult = new GetRecordsResult();
GetRecordsResult restartGetRecordsResult = mock(GetRecordsResult.class);
Record record = mock(Record.class);
final String initialIterator = "InitialIterator";
final String nextShardIterator = "NextShardIterator";
final String restartShardIterator = "RestartIterator";
final String restartNextShardIterator = "RestartNextIterator";
final String sequenceNumber = "SequenceNumber";
final String iteratorType = "AT_SEQUENCE_NUMBER";
KinesisProxy kinesisProxy = mock(KinesisProxy.class);
@ -292,6 +307,7 @@ public class KinesisDataFetcherTest {
when(kinesisProxy.get(eq(initialIterator), eq(10))).thenReturn(getRecordsResult);
when(getRecordsResult.getRecords()).thenReturn(Collections.singletonList(record));
when(getRecordsResult.getNextShardIterator()).thenReturn(nextShardIterator);
when(getRecordsResult.getChildShards()).thenReturn(Collections.emptyList());
when(record.getSequenceNumber()).thenReturn(sequenceNumber);
fetcher.initialize(InitialPositionInStream.LATEST.toString(), INITIAL_POSITION_LATEST);
@ -300,6 +316,8 @@ public class KinesisDataFetcherTest {
verify(kinesisProxy).get(eq(initialIterator), eq(10));
when(kinesisProxy.getIterator(eq(SHARD_ID), eq(iteratorType), eq(sequenceNumber))).thenReturn(restartShardIterator);
when(restartGetRecordsResult.getNextShardIterator()).thenReturn(restartNextShardIterator);
when(restartGetRecordsResult.getChildShards()).thenReturn(Collections.emptyList());
when(kinesisProxy.get(eq(restartShardIterator), eq(10))).thenReturn(restartGetRecordsResult);
fetcher.restartIterator();
@ -309,7 +327,7 @@ public class KinesisDataFetcherTest {
}
@Test (expected = IllegalStateException.class)
public void testRestartIteratorNotInitialized() {
public void testRestartIteratorNotInitialized() throws Exception {
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
dataFetcher.restartIterator();
}
@ -354,6 +372,8 @@ public class KinesisDataFetcherTest {
List<Record> expectedRecords = new ArrayList<Record>();
GetRecordsResult response = new GetRecordsResult();
response.setRecords(expectedRecords);
response.setNextShardIterator("testNextShardIterator");
response.setChildShards(Collections.emptyList());
when(kinesis.getIterator(SHARD_ID, initialPositionInStream.getTimestamp())).thenReturn(iterator);
when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqNo)).thenReturn(iterator);

View file

@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -74,6 +75,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
private IKinesisProxy proxy;
@Mock
private ShardInfo shardInfo;
@Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator;
@Before
public void setup() {
@ -171,7 +174,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
}
@Test
public void testExpiredIteratorException() {
public void testExpiredIteratorException() throws Exception {
when(dataFetcher.getRecords(eq(MAX_RECORDS_PER_CALL))).thenAnswer(new Answer<DataFetcherResult>() {
@Override
public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws Throwable {
@ -215,6 +218,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
GetRecordsResult getRecordsResult = new GetRecordsResult();
getRecordsResult.setRecords(new ArrayList<>(records));
getRecordsResult.setMillisBehindLatest(1000L);
getRecordsResult.setNextShardIterator("testNextShardIterator");
getRecordsResult.setChildShards(Collections.emptyList());
return new AdvancingResult(getRecordsResult);
}

View file

@ -31,6 +31,7 @@ import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -98,6 +99,8 @@ public class PrefetchGetRecordsCacheTest {
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResult);
when(getRecordsResult.getRecords()).thenReturn(records);
when(getRecordsResult.getNextShardIterator()).thenReturn("testNextShardIterator");
when(getRecordsResult.getChildShards()).thenReturn(Collections.emptyList());
}
@Test
@ -203,7 +206,7 @@ public class PrefetchGetRecordsCacheTest {
}
@Test
public void testExpiredIteratorException() {
public void testExpiredIteratorException() throws Exception{
getRecordsCache.start();
when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class).thenReturn(getRecordsResult);

View file

@ -39,6 +39,7 @@ import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.ListIterator;
@ -245,7 +246,7 @@ public class ShardConsumerTest {
@SuppressWarnings("unchecked")
@Test
public final void testRecordProcessorThrowable() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
ShardInfo shardInfo = new ShardInfo("s-0-0", UUID.randomUUID().toString(), null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
@ -271,6 +272,7 @@ public class ShardConsumerTest {
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
when(streamProxy.getIterator(anyString(), anyString(), anyString())).thenReturn("startingIterator");
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
@ -538,7 +540,7 @@ public class ShardConsumerTest {
int numRecs = 10;
BigInteger startSeqNum = BigInteger.ONE;
String streamShardId = "kinesis-0-0";
String testConcurrencyToken = "testToken";
String testConcurrencyToken = UUID.randomUUID().toString();
List<Shard> shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum);
// Close the shard so that shutdown is called with reason terminate
shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber(
@ -606,8 +608,7 @@ public class ShardConsumerTest {
shardSyncer,
shardSyncStrategy);
when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null));
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
@ -657,7 +658,7 @@ public class ShardConsumerTest {
}
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE)));
verify(getRecordsCache).shutdown();
@ -681,7 +682,7 @@ public class ShardConsumerTest {
int numRecs = 10;
BigInteger startSeqNum = BigInteger.ONE;
String streamShardId = "kinesis-0-0";
String testConcurrencyToken = "testToken";
String testConcurrencyToken = UUID.randomUUID().toString();
List<Shard> shardList = KinesisLocalFileDataCreator.createShardList(3, "kinesis-0-", startSeqNum);
// Close the shard so that shutdown is called with reason terminate
shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber(
@ -749,7 +750,12 @@ public class ShardConsumerTest {
shardSyncer,
shardSyncStrategy);
when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null));
List<String> parentShardIds = new ArrayList<>();
parentShardIds.add(shardInfo.getShardId());
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(),
"leaseOwner",
parentShardIds));
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -939,7 +945,7 @@ public class ShardConsumerTest {
@SuppressWarnings("unchecked")
@Test
public final void testConsumeShardInitializedWithPendingCheckpoint() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
ShardInfo shardInfo = new ShardInfo("s-0-0", UUID.randomUUID().toString(), null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
@ -967,6 +973,7 @@ public class ShardConsumerTest {
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
when(streamProxy.getIterator(anyString(), anyString(), anyString())).thenReturn("startingIterator");
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory());
@ -1125,6 +1132,14 @@ public class ShardConsumerTest {
return userRecords;
}
private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
KinesisClientLease lease = new KinesisClientLease();
lease.setLeaseKey(leaseKey);
lease.setLeaseOwner(leaseOwner);
lease.setParentShardIds(parentShardIds);
return lease;
}
Matcher<InitializationInput> initializationInputMatcher(final ExtendedSequenceNumber checkpoint,
final ExtendedSequenceNumber pendingCheckpoint) {
return new TypeSafeMatcher<InitializationInput>() {

View file

@ -24,11 +24,17 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
@ -60,7 +66,7 @@ public class ShutdownTaskTest {
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
Set<String> defaultParentShardIds = new HashSet<>();
String defaultConcurrencyToken = "testToken4398";
String defaultConcurrencyToken = UUID.randomUUID().toString();
String defaultShardId = "shardId-0";
ShardInfo defaultShardInfo = new ShardInfo(defaultShardId,
defaultConcurrencyToken,
@ -70,10 +76,16 @@ public class ShutdownTaskTest {
ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
@Mock
private IKinesisProxy kinesisProxy;
@Mock
private GetRecordsCache getRecordsCache;
@Mock
private ShardSyncStrategy shardSyncStrategy;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator;
/**
* @throws java.lang.Exception
@ -95,6 +107,10 @@ public class ShutdownTaskTest {
@Before
public void setUp() throws Exception {
doNothing().when(getRecordsCache).shutdown();
final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease);
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
}
/**
@ -111,12 +127,6 @@ public class ShutdownTaskTest {
public final void testCallWhenApplicationDoesNotCheckpoint() {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
List<Shard> shards = constructShardListForGraphA();
when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
@ -132,31 +142,29 @@ public class ShutdownTaskTest {
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
shardSyncStrategy,
constructChildShards());
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " +
"Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information.";
Assert.assertEquals(expectedExceptionMessage, result.getException().getMessage());
}
/**
* Test method for {@link ShutdownTask#call()}.
*/
@Test
public final void testCallWhenSyncingShardsThrows() {
public final void testCallWhenCreatingLeaseThrows() throws Exception {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(new KinesisClientLibIOException("")));
final String exceptionMessage = "InvalidStateException is thrown.";
when(leaseManager.createLeaseIfNotExists(any(KinesisClientLease.class))).thenThrow(new InvalidStateException(exceptionMessage));
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
@ -169,30 +177,21 @@ public class ShutdownTaskTest {
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
shardSyncStrategy,
constructChildShards());
TaskResult result = task.call();
verify(shardSyncStrategy).onShardConsumerShutDown(shards);
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(getRecordsCache).shutdown();
verify(leaseCoordinator).dropLease(any(KinesisClientLease.class));
Assert.assertNull(result.getException());
}
@Test
public final void testCallWhenShardEnd() {
public final void testCallWhenShardEnd() throws Exception {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
@ -205,36 +204,27 @@ public class ShutdownTaskTest {
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
shardSyncStrategy,
constructChildShards());
TaskResult result = task.call();
verify(shardSyncStrategy).onShardConsumerShutDown(shards);
verify(kinesisProxy, times(1)).verifyShardClosure(anyString());
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class));
Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown();
verify(leaseCoordinator, never()).dropLease(any());
}
@Test
public final void testCallWhenFalseShardEnd() {
public final void testCallWhenShardNotFound() throws Exception {
ShardInfo shardInfo = new ShardInfo("shardId-4",
defaultConcurrencyToken,
defaultParentShardIds,
ExtendedSequenceNumber.LATEST);
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease());
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
ShutdownTask task = new ShutdownTask(shardInfo,
defaultRecordProcessor,
checkpointer,
@ -247,31 +237,23 @@ public class ShutdownTaskTest {
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
shardSyncStrategy,
Collections.emptyList());
TaskResult result = task.call();
verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
verify(kinesisProxy, times(1)).verifyShardClosure(anyString());
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown();
verify(leaseCoordinator).dropLease(any());
}
@Test
public final void testCallWhenLeaseLost() {
public final void testCallWhenLeaseLost() throws Exception {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
@ -284,13 +266,13 @@ public class ShutdownTaskTest {
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
shardSyncStrategy,
Collections.emptyList());
TaskResult result = task.call();
verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
verify(kinesisProxy, never()).getShardList();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown();
verify(leaseCoordinator, never()).dropLease(any());
}
/**
@ -299,10 +281,39 @@ public class ShutdownTaskTest {
@Test
public final void testGetTaskType() {
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, getRecordsCache, shardSyncer, shardSyncStrategy);
ShutdownTask task = new ShutdownTask(null, null, null, null,
null, null, false,
false, leaseCoordinator, 0,
getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList());
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
}
private List<ChildShard> constructChildShards() {
List<ChildShard> childShards = new ArrayList<>();
List<String> parentShards = new ArrayList<>();
parentShards.add(defaultShardId);
ChildShard leftChild = new ChildShard();
leftChild.setShardId("ShardId-1");
leftChild.setParentShards(parentShards);
leftChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"));
childShards.add(leftChild);
ChildShard rightChild = new ChildShard();
rightChild.setShardId("ShardId-2");
rightChild.setParentShards(parentShards);
rightChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"));
childShards.add(rightChild);
return childShards;
}
private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
KinesisClientLease lease = new KinesisClientLease();
lease.setLeaseKey(leaseKey);
lease.setLeaseOwner(leaseOwner);
lease.setParentShardIds(parentShardIds);
return lease;
}
/*
* Helper method to construct a shard list for graph A. Graph A is defined below.

View file

@ -25,6 +25,7 @@ import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -33,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -388,14 +390,33 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
*/
response.setNextShardIterator(serializeIterator(iterator.shardId, lastRecordsSeqNo.add(BigInteger.ONE)
.toString()));
response.setChildShards(Collections.emptyList());
LOG.debug("Returning a non null iterator for shard " + iterator.shardId);
} else {
response.setChildShards(constructChildShards(iterator));
LOG.info("Returning null iterator for shard " + iterator.shardId);
}
return response;
}
private List<ChildShard> constructChildShards(IteratorInfo iterator) {
List<ChildShard> childShards = new ArrayList<>();
List<String> parentShards = new ArrayList<>();
parentShards.add(iterator.shardId);
ChildShard leftChild = new ChildShard();
leftChild.setShardId("ShardId-1");
leftChild.setParentShards(parentShards);
childShards.add(leftChild);
ChildShard rightChild = new ChildShard();
rightChild.setShardId("ShardId-2");
rightChild.setParentShards(parentShards);
childShards.add(rightChild);
return childShards;
}
/**
* {@inheritDoc}
*/

View file

@ -30,7 +30,7 @@ public class KinesisClientLeaseBuilder {
private ExtendedSequenceNumber pendingCheckpoint;
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>();
private HashKeyRangeForLease hashKeyRangeForLease;
public KinesisClientLeaseBuilder withLeaseKey(String leaseKey) {
@ -90,7 +90,6 @@ public class KinesisClientLeaseBuilder {
public KinesisClientLease build() {
return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds,
hashKeyRangeForLease);
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, hashKeyRangeForLease);
}
}