Merge pull request #67 from ychunxue/ltr1x_latest
KCL 1.x ShardEnd Shard Sync and Lease table ChildShard persistence
This commit is contained in:
commit
53cc7fc347
17 changed files with 349 additions and 137 deletions
|
|
@ -530,7 +530,8 @@ class ConsumerStates {
|
|||
consumer.isIgnoreUnexpectedChildShards(),
|
||||
consumer.getLeaseCoordinator(),
|
||||
consumer.getTaskBackoffTimeMillis(),
|
||||
consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy());
|
||||
consumer.getGetRecordsCache(), consumer.getShardSyncer(),
|
||||
consumer.getShardSyncStrategy(), consumer.getChildShards());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -16,7 +16,12 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.SdkClientException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -47,6 +52,7 @@ class KinesisDataFetcher {
|
|||
private boolean isInitialized;
|
||||
private String lastKnownSequenceNumber;
|
||||
private InitialPositionInStreamExtended initialPositionInStream;
|
||||
private List<ChildShard> childShards = Collections.emptyList();
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
@ -85,8 +91,11 @@ class KinesisDataFetcher {
|
|||
final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
|
||||
@Override
|
||||
public GetRecordsResult getResult() {
|
||||
return new GetRecordsResult().withMillisBehindLatest(null).withRecords(Collections.emptyList())
|
||||
.withNextShardIterator(null);
|
||||
return new GetRecordsResult()
|
||||
.withMillisBehindLatest(null)
|
||||
.withRecords(Collections.emptyList())
|
||||
.withNextShardIterator(null)
|
||||
.withChildShards(Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -113,12 +122,20 @@ class KinesisDataFetcher {
|
|||
|
||||
@Override
|
||||
public GetRecordsResult accept() {
|
||||
if (!isValidResult(result)) {
|
||||
// Throwing SDK exception when the GetRecords result is not valid. This will allow PrefetchGetRecordsCache to retry the GetRecords call.
|
||||
throw new SdkClientException("Shard " + shardId +": GetRecordsResult is not valid. NextShardIterator: " + result.getNextShardIterator()
|
||||
+ ". ChildShards: " + result.getChildShards());
|
||||
}
|
||||
nextIterator = result.getNextShardIterator();
|
||||
if (!CollectionUtils.isNullOrEmpty(result.getRecords())) {
|
||||
lastKnownSequenceNumber = Iterables.getLast(result.getRecords()).getSequenceNumber();
|
||||
}
|
||||
if (nextIterator == null) {
|
||||
LOG.info("Reached shard end: nextIterator is null in AdvancingResult.accept for shard " + shardId);
|
||||
LOG.info("Reached shard end: nextIterator is null in AdvancingResult.accept for shard " + shardId + ". childShards: " + result.getChildShards());
|
||||
if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) {
|
||||
childShards = result.getChildShards();
|
||||
}
|
||||
isShardEndReached = true;
|
||||
}
|
||||
return getResult();
|
||||
|
|
@ -130,6 +147,23 @@ class KinesisDataFetcher {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isValidResult(GetRecordsResult getRecordsResult) {
|
||||
// GetRecords result should contain childShard information. There are two valid combination for the nextShardIterator and childShards
|
||||
// If the GetRecords call does not reach the shard end, getRecords result should contain a non-null nextShardIterator and an empty list of childShards.
|
||||
// If the GetRecords call reaches the shard end, getRecords result should contain a null nextShardIterator and a non-empty list of childShards.
|
||||
// All other combinations are invalid and indicating an issue with GetRecords result from Kinesis service.
|
||||
if (getRecordsResult.getNextShardIterator() == null && CollectionUtils.isNullOrEmpty(getRecordsResult.getChildShards()) ||
|
||||
getRecordsResult.getNextShardIterator() != null && !CollectionUtils.isNullOrEmpty(getRecordsResult.getChildShards())) {
|
||||
return false;
|
||||
}
|
||||
for (ChildShard childShard : getRecordsResult.getChildShards()) {
|
||||
if (CollectionUtils.isNullOrEmpty(childShard.getParentShards())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
|
||||
* @param initialCheckpoint Current checkpoint sequence number for this shard.
|
||||
|
|
@ -141,8 +175,7 @@ class KinesisDataFetcher {
|
|||
isInitialized = true;
|
||||
}
|
||||
|
||||
public void initialize(ExtendedSequenceNumber initialCheckpoint,
|
||||
InitialPositionInStreamExtended initialPositionInStream) {
|
||||
public void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream) {
|
||||
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint.getSequenceNumber());
|
||||
advanceIteratorTo(initialCheckpoint.getSequenceNumber(), initialPositionInStream);
|
||||
isInitialized = true;
|
||||
|
|
@ -171,6 +204,7 @@ class KinesisDataFetcher {
|
|||
if (nextIterator == null) {
|
||||
LOG.info("Reached shard end: cannot advance iterator for shard " + shardId);
|
||||
isShardEndReached = true;
|
||||
// TODO: transition to ShuttingDown state on shardend instead to shutdown state for enqueueing this for cleanup
|
||||
}
|
||||
this.lastKnownSequenceNumber = sequenceNumber;
|
||||
this.initialPositionInStream = initialPositionInStream;
|
||||
|
|
@ -248,6 +282,10 @@ class KinesisDataFetcher {
|
|||
return isShardEndReached;
|
||||
}
|
||||
|
||||
protected List<ChildShard> getChildShards() {
|
||||
return childShards;
|
||||
}
|
||||
|
||||
/** Note: This method has package level access for testing purposes.
|
||||
* @return nextIterator
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
|||
|
||||
import com.amazonaws.services.kinesis.leases.impl.Lease;
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.services.kinesis.model.ShardFilter;
|
||||
import com.amazonaws.services.kinesis.model.ShardFilterType;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
|
|
@ -779,6 +780,29 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
return newLease;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create a new KinesisClientLease POJO for a ChildShard.
|
||||
* Note: Package level access only for testing purposes
|
||||
*
|
||||
* @param childShard
|
||||
* @return
|
||||
*/
|
||||
static KinesisClientLease newKCLLeaseForChildShard(ChildShard childShard) throws InvalidStateException {
|
||||
final KinesisClientLease newLease = new KinesisClientLease();
|
||||
newLease.setLeaseKey(childShard.getShardId());
|
||||
final List<String> parentShardIds = new ArrayList<>();
|
||||
if (!CollectionUtils.isNullOrEmpty(childShard.getParentShards())) {
|
||||
parentShardIds.addAll(childShard.getParentShards());
|
||||
} else {
|
||||
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.getShardId()
|
||||
+ " because parent shards cannot be found.");
|
||||
}
|
||||
newLease.setParentShardIds(parentShardIds);
|
||||
newLease.setOwnerSwitchesSinceCheckpoint(0L);
|
||||
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
return newLease;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to construct a shardId->Shard map for the specified list of shards.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -129,6 +129,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
|
|||
try {
|
||||
result = getRecordsResultQueue.take().withCacheExitTime(Instant.now());
|
||||
prefetchCounters.removed(result);
|
||||
log.info("Shard " + shardId + ": Number of records remaining in queue is " + getRecordsResultQueue.size());
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Interrupted while getting records from the cache", e);
|
||||
}
|
||||
|
|
@ -177,7 +178,6 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
|
|||
|
||||
MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count,
|
||||
MetricsLevel.SUMMARY);
|
||||
|
||||
dataFetcher.restartIterator();
|
||||
} catch (SdkClientException e) {
|
||||
log.error("Exception thrown while fetching records from Kinesis", e);
|
||||
|
|
|
|||
|
|
@ -152,8 +152,8 @@ class ProcessTask implements ITask {
|
|||
|
||||
try {
|
||||
if (dataFetcher.isShardEndReached()) {
|
||||
LOG.info("Reached end of shard " + shardInfo.getShardId());
|
||||
return new TaskResult(null, true);
|
||||
LOG.info("Reached end of shard " + shardInfo.getShardId() + ". Found childShards: " + dataFetcher.getChildShards());
|
||||
return new TaskResult(null, true, dataFetcher.getChildShards());
|
||||
}
|
||||
|
||||
final ProcessRecordsInput processRecordsInput = getRecordsResult();
|
||||
|
|
@ -353,7 +353,7 @@ class ProcessTask implements ITask {
|
|||
* recordProcessorCheckpointer).
|
||||
*/
|
||||
dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue()
|
||||
.getSequenceNumber(), streamConfig.getInitialPositionInStream());
|
||||
.getSequenceNumber(), streamConfig.getInitialPositionInStream());
|
||||
|
||||
// Try a second time - if we fail this time, expose the failure.
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -15,11 +15,14 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -66,6 +69,9 @@ class ShardConsumer {
|
|||
private Future<TaskResult> future;
|
||||
private ShardSyncStrategy shardSyncStrategy;
|
||||
|
||||
@Getter
|
||||
private List<ChildShard> childShards;
|
||||
|
||||
@Getter
|
||||
private final GetRecordsCache getRecordsCache;
|
||||
|
||||
|
|
@ -321,6 +327,10 @@ class ShardConsumer {
|
|||
TaskResult result = future.get();
|
||||
if (result.getException() == null) {
|
||||
if (result.isShardEndReached()) {
|
||||
if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) {
|
||||
childShards = result.getChildShards();
|
||||
LOG.info("Shard " + shardInfo.getShardId() + ": Setting childShards in ShardConsumer: " + childShards);
|
||||
}
|
||||
return TaskOutcome.END_OF_SHARD;
|
||||
}
|
||||
return TaskOutcome.SUCCESSFUL;
|
||||
|
|
@ -420,6 +430,7 @@ class ShardConsumer {
|
|||
void updateState(TaskOutcome taskOutcome) {
|
||||
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
|
||||
markForShutdown(ShutdownReason.TERMINATE);
|
||||
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE");
|
||||
}
|
||||
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
|
||||
currentState = currentState.shutdownTransition(shutdownReason);
|
||||
|
|
|
|||
|
|
@ -14,9 +14,11 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -30,6 +32,9 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Task for invoking the RecordProcessor shutdown() callback.
|
||||
|
|
@ -54,6 +59,7 @@ class ShutdownTask implements ITask {
|
|||
private final GetRecordsCache getRecordsCache;
|
||||
private final ShardSyncer shardSyncer;
|
||||
private final ShardSyncStrategy shardSyncStrategy;
|
||||
private final List<ChildShard> childShards;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -69,7 +75,8 @@ class ShutdownTask implements ITask {
|
|||
boolean ignoreUnexpectedChildShards,
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator,
|
||||
long backoffTimeMillis,
|
||||
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
|
||||
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer,
|
||||
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards) {
|
||||
this.shardInfo = shardInfo;
|
||||
this.recordProcessor = recordProcessor;
|
||||
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
|
||||
|
|
@ -83,6 +90,7 @@ class ShutdownTask implements ITask {
|
|||
this.getRecordsCache = getRecordsCache;
|
||||
this.shardSyncer = shardSyncer;
|
||||
this.shardSyncStrategy = shardSyncStrategy;
|
||||
this.childShards = childShards;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -97,29 +105,39 @@ class ShutdownTask implements ITask {
|
|||
boolean applicationException = false;
|
||||
|
||||
try {
|
||||
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: "
|
||||
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards);
|
||||
ShutdownReason localReason = reason;
|
||||
List<Shard> latestShards = null;
|
||||
/*
|
||||
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
|
||||
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active
|
||||
* workers to contend for the lease of this shard.
|
||||
*/
|
||||
if(localReason == ShutdownReason.TERMINATE) {
|
||||
ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId());
|
||||
if (shardClosureVerificationResponse instanceof ShardListWrappingShardClosureVerificationResponse) {
|
||||
latestShards = ((ShardListWrappingShardClosureVerificationResponse)shardClosureVerificationResponse).getLatestShards();
|
||||
}
|
||||
|
||||
// If shard in context is not closed yet we should shut down the ShardConsumer with Zombie state
|
||||
// which avoids checkpoint-ing with SHARD_END sequence number.
|
||||
if(!shardClosureVerificationResponse.isShardClosed()) {
|
||||
// Create new lease for the child shards if they don't exist.
|
||||
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
|
||||
// This would happen when KinesisDataFetcher catches ResourceNotFound exception.
|
||||
// In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
||||
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
||||
try {
|
||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||
createLeasesForChildShardsIfNotExist();
|
||||
updateCurrentLeaseWithChildShards();
|
||||
} else {
|
||||
LOG.warn("Shard " + shardInfo.getShardId()
|
||||
+ ": Shutting down consumer with SHARD_END reason without creating leases for child shards.");
|
||||
}
|
||||
} catch (InvalidStateException e) {
|
||||
// If invalidStateException happens, it indicates we are missing childShard related information.
|
||||
// In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting
|
||||
// childShard information in the processTask.
|
||||
localReason = ShutdownReason.ZOMBIE;
|
||||
dropLease();
|
||||
LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId());
|
||||
LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " +
|
||||
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// If we reached end of the shard, set sequence number to SHARD_END.
|
||||
if (localReason == ShutdownReason.TERMINATE) {
|
||||
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
|
||||
|
|
@ -127,8 +145,6 @@ class ShutdownTask implements ITask {
|
|||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
|
||||
}
|
||||
|
||||
LOG.debug("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken "
|
||||
+ shardInfo.getConcurrencyToken() + ". Shutdown reason: " + localReason);
|
||||
final ShutdownInput shutdownInput = new ShutdownInput()
|
||||
.withShutdownReason(localReason)
|
||||
.withCheckpointer(recordProcessorCheckpointer);
|
||||
|
|
@ -156,18 +172,6 @@ class ShutdownTask implements ITask {
|
|||
MetricsLevel.SUMMARY);
|
||||
}
|
||||
|
||||
if (localReason == ShutdownReason.TERMINATE) {
|
||||
LOG.debug("Looking for child shards of shard " + shardInfo.getShardId());
|
||||
// create leases for the child shards
|
||||
TaskResult result = shardSyncStrategy.onShardConsumerShutDown(latestShards);
|
||||
if (result.getException() != null) {
|
||||
LOG.debug("Exception while trying to sync shards on the shutdown of shard: " + shardInfo
|
||||
.getShardId());
|
||||
throw result.getException();
|
||||
}
|
||||
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
|
||||
}
|
||||
|
||||
return new TaskResult(null);
|
||||
} catch (Exception e) {
|
||||
if (applicationException) {
|
||||
|
|
@ -187,6 +191,33 @@ class ShutdownTask implements ITask {
|
|||
return new TaskResult(exception);
|
||||
}
|
||||
|
||||
private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException {
|
||||
for (ChildShard childShard : childShards) {
|
||||
final String leaseKey = childShard.getShardId();
|
||||
if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) {
|
||||
final KinesisClientLease leaseToCreate = KinesisShardSyncer.newKCLLeaseForChildShard(childShard);
|
||||
leaseCoordinator.getLeaseManager().createLeaseIfNotExists(leaseToCreate);
|
||||
LOG.info("Shard " + shardInfo.getShardId() + " : Created child shard lease: " + leaseToCreate.getLeaseKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateCurrentLeaseWithChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
|
||||
if (currentLease == null) {
|
||||
throw new InvalidStateException("Failed to retrieve current lease for shard " + shardInfo.getShardId());
|
||||
}
|
||||
final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet());
|
||||
|
||||
currentLease.setChildShardIds(childShardIds);
|
||||
final boolean updateResult = leaseCoordinator.updateLease(currentLease, UUID.fromString(shardInfo.getConcurrencyToken()));
|
||||
if (!updateResult) {
|
||||
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.getShardId());
|
||||
}
|
||||
LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey());
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
|
|
@ -204,6 +235,10 @@ class ShutdownTask implements ITask {
|
|||
|
||||
private void dropLease() {
|
||||
KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
|
||||
if (lease == null) {
|
||||
LOG.warn("Shard " + shardInfo.getShardId() + ": Lease already dropped. Will shutdown the shardConsumer directly.");
|
||||
return;
|
||||
}
|
||||
leaseCoordinator.dropLease(lease);
|
||||
LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,10 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Used to capture information from a task that we want to communicate back to the higher layer.
|
||||
* E.g. exception thrown when executing the task, if we reach end of a shard.
|
||||
|
|
@ -26,6 +30,9 @@ class TaskResult {
|
|||
// Any exception caught while executing the task.
|
||||
private Exception exception;
|
||||
|
||||
// List of childShards of the current shard. This field is only required for the task result when we reach end of a shard.
|
||||
private List<ChildShard> childShards;
|
||||
|
||||
/**
|
||||
* @return the shardEndReached
|
||||
*/
|
||||
|
|
@ -33,6 +40,11 @@ class TaskResult {
|
|||
return shardEndReached;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the list of childShards.
|
||||
*/
|
||||
protected List<ChildShard> getChildShards() { return childShards; }
|
||||
|
||||
/**
|
||||
* @param shardEndReached the shardEndReached to set
|
||||
*/
|
||||
|
|
@ -40,6 +52,11 @@ class TaskResult {
|
|||
this.shardEndReached = shardEndReached;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param childShards the list of childShards to set
|
||||
*/
|
||||
protected void setChildShards(List<ChildShard> childShards) { this.childShards = childShards; }
|
||||
|
||||
/**
|
||||
* @return the exception
|
||||
*/
|
||||
|
|
@ -70,4 +87,10 @@ class TaskResult {
|
|||
this.shardEndReached = isShardEndReached;
|
||||
}
|
||||
|
||||
TaskResult(Exception e, boolean isShardEndReached, List<ChildShard> childShards) {
|
||||
this.exception = e;
|
||||
this.shardEndReached = isShardEndReached;
|
||||
this.childShards = childShards;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,9 +30,10 @@ public class KinesisClientLease extends Lease {
|
|||
private ExtendedSequenceNumber pendingCheckpoint;
|
||||
private Long ownerSwitchesSinceCheckpoint = 0L;
|
||||
private Set<String> parentShardIds = new HashSet<String>();
|
||||
private Set<String> childShardIds = new HashSet<String>();
|
||||
private Set<String> childShardIds = new HashSet<>();
|
||||
private HashKeyRangeForLease hashKeyRangeForLease;
|
||||
|
||||
|
||||
public KinesisClientLease() {
|
||||
|
||||
}
|
||||
|
|
@ -43,7 +44,7 @@ public class KinesisClientLease extends Lease {
|
|||
this.pendingCheckpoint = other.getPendingCheckpoint();
|
||||
this.ownerSwitchesSinceCheckpoint = other.getOwnerSwitchesSinceCheckpoint();
|
||||
this.parentShardIds.addAll(other.getParentShardIds());
|
||||
this.childShardIds = other.getChildShardIds();
|
||||
this.childShardIds.addAll(other.getChildShardIds());
|
||||
this.hashKeyRangeForLease = other.getHashKeyRange();
|
||||
}
|
||||
|
||||
|
|
@ -76,6 +77,7 @@ public class KinesisClientLease extends Lease {
|
|||
setCheckpoint(casted.checkpoint);
|
||||
setPendingCheckpoint(casted.pendingCheckpoint);
|
||||
setParentShardIds(casted.parentShardIds);
|
||||
setChildShardIds(casted.childShardIds);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -108,7 +110,7 @@ public class KinesisClientLease extends Lease {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return shardIds that are the children of this lease. Used for resharding.
|
||||
* @return shardIds for the child shards of the current shard. Used for resharding.
|
||||
*/
|
||||
public Set<String> getChildShardIds() {
|
||||
return new HashSet<String>(childShardIds);
|
||||
|
|
@ -170,9 +172,6 @@ public class KinesisClientLease extends Lease {
|
|||
* @param childShardIds may not be null
|
||||
*/
|
||||
public void setChildShardIds(Collection<String> childShardIds) {
|
||||
verifyNotNull(childShardIds, "childShardIds should not be null");
|
||||
|
||||
this.childShardIds.clear();
|
||||
this.childShardIds.addAll(childShardIds);
|
||||
}
|
||||
|
||||
|
|
@ -186,7 +185,7 @@ public class KinesisClientLease extends Lease {
|
|||
|
||||
this.hashKeyRangeForLease = hashKeyRangeForLease;
|
||||
}
|
||||
|
||||
|
||||
private void verifyNotNull(Object object, String message) {
|
||||
if (object == null) {
|
||||
throw new IllegalArgumentException(message);
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
|
|||
private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint";
|
||||
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
|
||||
public final String PARENT_SHARD_ID_KEY = "parentShardId";
|
||||
private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
|
||||
public final String CHILD_SHARD_IDS_KEY = "childShardIds";
|
||||
private static final String STARTING_HASH_KEY = "startingHashKey";
|
||||
private static final String ENDING_HASH_KEY = "endingHashKey";
|
||||
|
||||
|
|
@ -56,9 +56,12 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
|
|||
result.put(OWNER_SWITCHES_KEY, DynamoUtils.createAttributeValue(lease.getOwnerSwitchesSinceCheckpoint()));
|
||||
result.put(CHECKPOINT_SEQUENCE_NUMBER_KEY, DynamoUtils.createAttributeValue(lease.getCheckpoint().getSequenceNumber()));
|
||||
result.put(CHECKPOINT_SUBSEQUENCE_NUMBER_KEY, DynamoUtils.createAttributeValue(lease.getCheckpoint().getSubSequenceNumber()));
|
||||
if (lease.getParentShardIds() != null && !lease.getParentShardIds().isEmpty()) {
|
||||
if (!CollectionUtils.isNullOrEmpty(lease.getParentShardIds())) {
|
||||
result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.getParentShardIds()));
|
||||
}
|
||||
if (!CollectionUtils.isNullOrEmpty(lease.getChildShardIds())) {
|
||||
result.put(CHILD_SHARD_IDS_KEY, DynamoUtils.createAttributeValue(lease.getChildShardIds()));
|
||||
}
|
||||
|
||||
if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) {
|
||||
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()));
|
||||
|
|
@ -79,6 +82,7 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
|
|||
DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY))
|
||||
);
|
||||
result.setParentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
|
||||
result.setChildShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_IDS_KEY));
|
||||
|
||||
if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) {
|
||||
result.setPendingCheckpoint(
|
||||
|
|
@ -150,6 +154,9 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
|
|||
result.put(OWNER_SWITCHES_KEY,
|
||||
new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getOwnerSwitchesSinceCheckpoint()),
|
||||
AttributeAction.PUT));
|
||||
if (!CollectionUtils.isNullOrEmpty(lease.getChildShardIds())) {
|
||||
result.put(CHILD_SHARD_IDS_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getChildShardIds()), AttributeAction.PUT));
|
||||
}
|
||||
|
||||
if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) {
|
||||
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()), AttributeAction.PUT));
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import java.util.Collections;
|
|||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
|
@ -132,7 +133,7 @@ public class KinesisDataFetcherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testadvanceIteratorTo() throws KinesisClientLibException {
|
||||
public void testadvanceIteratorTo() throws Exception {
|
||||
IKinesisProxy kinesis = mock(IKinesisProxy.class);
|
||||
ICheckpoint checkpoint = mock(ICheckpoint.class);
|
||||
|
||||
|
|
@ -146,9 +147,13 @@ public class KinesisDataFetcherTest {
|
|||
GetRecordsResult outputA = new GetRecordsResult();
|
||||
List<Record> recordsA = new ArrayList<Record>();
|
||||
outputA.setRecords(recordsA);
|
||||
outputA.setNextShardIterator("nextShardIteratorA");
|
||||
outputA.setChildShards(Collections.emptyList());
|
||||
GetRecordsResult outputB = new GetRecordsResult();
|
||||
List<Record> recordsB = new ArrayList<Record>();
|
||||
outputB.setRecords(recordsB);
|
||||
outputB.setNextShardIterator("nextShardIteratorB");
|
||||
outputB.setChildShards(Collections.emptyList());
|
||||
|
||||
when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqA)).thenReturn(iteratorA);
|
||||
when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqB)).thenReturn(iteratorB);
|
||||
|
|
@ -166,7 +171,7 @@ public class KinesisDataFetcherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() {
|
||||
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws Exception{
|
||||
IKinesisProxy kinesis = mock(IKinesisProxy.class);
|
||||
|
||||
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
|
||||
|
|
@ -189,7 +194,7 @@ public class KinesisDataFetcherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetRecordsWithResourceNotFoundException() {
|
||||
public void testGetRecordsWithResourceNotFoundException() throws Exception {
|
||||
// Set up arguments used by proxy
|
||||
String nextIterator = "TestShardIterator";
|
||||
int maxRecords = 100;
|
||||
|
|
@ -211,11 +216,12 @@ public class KinesisDataFetcherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testNonNullGetRecords() {
|
||||
public void testNonNullGetRecords() throws Exception {
|
||||
String nextIterator = "TestIterator";
|
||||
int maxRecords = 100;
|
||||
|
||||
KinesisProxy mockProxy = mock(KinesisProxy.class);
|
||||
when(mockProxy.getIterator(anyString(), anyString())).thenReturn("targetIterator");
|
||||
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
|
||||
|
||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO);
|
||||
|
|
@ -232,17 +238,25 @@ public class KinesisDataFetcherTest {
|
|||
final String NEXT_ITERATOR_ONE = "NextIteratorOne";
|
||||
final String NEXT_ITERATOR_TWO = "NextIteratorTwo";
|
||||
when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR);
|
||||
GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class);
|
||||
when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE);
|
||||
|
||||
GetRecordsResult iteratorOneResults = new GetRecordsResult();
|
||||
iteratorOneResults.setNextShardIterator(NEXT_ITERATOR_ONE);
|
||||
iteratorOneResults.setChildShards(Collections.emptyList());
|
||||
when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults);
|
||||
|
||||
GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class);
|
||||
GetRecordsResult iteratorTwoResults = new GetRecordsResult();
|
||||
iteratorTwoResults.setNextShardIterator(NEXT_ITERATOR_TWO);
|
||||
iteratorTwoResults.setChildShards(Collections.emptyList());
|
||||
when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults);
|
||||
when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO);
|
||||
|
||||
GetRecordsResult finalResult = mock(GetRecordsResult.class);
|
||||
GetRecordsResult finalResult = new GetRecordsResult();
|
||||
finalResult.setNextShardIterator(null);
|
||||
List<ChildShard> childShards = new ArrayList<>();
|
||||
ChildShard childShard = new ChildShard();
|
||||
childShard.setParentShards(Collections.singletonList("parentShardId"));
|
||||
childShards.add(childShard);
|
||||
finalResult.setChildShards(childShards);
|
||||
when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult);
|
||||
when(finalResult.getNextShardIterator()).thenReturn(null);
|
||||
|
||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
|
||||
dataFetcher.initialize("TRIM_HORIZON",
|
||||
|
|
@ -276,13 +290,14 @@ public class KinesisDataFetcherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRestartIterator() {
|
||||
public void testRestartIterator() throws Exception{
|
||||
GetRecordsResult getRecordsResult = mock(GetRecordsResult.class);
|
||||
GetRecordsResult restartGetRecordsResult = new GetRecordsResult();
|
||||
GetRecordsResult restartGetRecordsResult = mock(GetRecordsResult.class);
|
||||
Record record = mock(Record.class);
|
||||
final String initialIterator = "InitialIterator";
|
||||
final String nextShardIterator = "NextShardIterator";
|
||||
final String restartShardIterator = "RestartIterator";
|
||||
final String restartNextShardIterator = "RestartNextIterator";
|
||||
final String sequenceNumber = "SequenceNumber";
|
||||
final String iteratorType = "AT_SEQUENCE_NUMBER";
|
||||
KinesisProxy kinesisProxy = mock(KinesisProxy.class);
|
||||
|
|
@ -292,6 +307,7 @@ public class KinesisDataFetcherTest {
|
|||
when(kinesisProxy.get(eq(initialIterator), eq(10))).thenReturn(getRecordsResult);
|
||||
when(getRecordsResult.getRecords()).thenReturn(Collections.singletonList(record));
|
||||
when(getRecordsResult.getNextShardIterator()).thenReturn(nextShardIterator);
|
||||
when(getRecordsResult.getChildShards()).thenReturn(Collections.emptyList());
|
||||
when(record.getSequenceNumber()).thenReturn(sequenceNumber);
|
||||
|
||||
fetcher.initialize(InitialPositionInStream.LATEST.toString(), INITIAL_POSITION_LATEST);
|
||||
|
|
@ -300,6 +316,8 @@ public class KinesisDataFetcherTest {
|
|||
verify(kinesisProxy).get(eq(initialIterator), eq(10));
|
||||
|
||||
when(kinesisProxy.getIterator(eq(SHARD_ID), eq(iteratorType), eq(sequenceNumber))).thenReturn(restartShardIterator);
|
||||
when(restartGetRecordsResult.getNextShardIterator()).thenReturn(restartNextShardIterator);
|
||||
when(restartGetRecordsResult.getChildShards()).thenReturn(Collections.emptyList());
|
||||
when(kinesisProxy.get(eq(restartShardIterator), eq(10))).thenReturn(restartGetRecordsResult);
|
||||
|
||||
fetcher.restartIterator();
|
||||
|
|
@ -309,7 +327,7 @@ public class KinesisDataFetcherTest {
|
|||
}
|
||||
|
||||
@Test (expected = IllegalStateException.class)
|
||||
public void testRestartIteratorNotInitialized() {
|
||||
public void testRestartIteratorNotInitialized() throws Exception {
|
||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
|
||||
dataFetcher.restartIterator();
|
||||
}
|
||||
|
|
@ -354,6 +372,8 @@ public class KinesisDataFetcherTest {
|
|||
List<Record> expectedRecords = new ArrayList<Record>();
|
||||
GetRecordsResult response = new GetRecordsResult();
|
||||
response.setRecords(expectedRecords);
|
||||
response.setNextShardIterator("testNextShardIterator");
|
||||
response.setChildShards(Collections.emptyList());
|
||||
|
||||
when(kinesis.getIterator(SHARD_ID, initialPositionInStream.getTimestamp())).thenReturn(iterator);
|
||||
when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqNo)).thenReturn(iterator);
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
|
@ -74,6 +75,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
|
|||
private IKinesisProxy proxy;
|
||||
@Mock
|
||||
private ShardInfo shardInfo;
|
||||
@Mock
|
||||
private KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
|
|
@ -171,7 +174,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testExpiredIteratorException() {
|
||||
public void testExpiredIteratorException() throws Exception {
|
||||
when(dataFetcher.getRecords(eq(MAX_RECORDS_PER_CALL))).thenAnswer(new Answer<DataFetcherResult>() {
|
||||
@Override
|
||||
public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws Throwable {
|
||||
|
|
@ -215,6 +218,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
|
|||
GetRecordsResult getRecordsResult = new GetRecordsResult();
|
||||
getRecordsResult.setRecords(new ArrayList<>(records));
|
||||
getRecordsResult.setMillisBehindLatest(1000L);
|
||||
getRecordsResult.setNextShardIterator("testNextShardIterator");
|
||||
getRecordsResult.setChildShards(Collections.emptyList());
|
||||
|
||||
return new AdvancingResult(getRecordsResult);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import static org.mockito.Mockito.when;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
|
@ -98,6 +99,8 @@ public class PrefetchGetRecordsCacheTest {
|
|||
|
||||
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResult);
|
||||
when(getRecordsResult.getRecords()).thenReturn(records);
|
||||
when(getRecordsResult.getNextShardIterator()).thenReturn("testNextShardIterator");
|
||||
when(getRecordsResult.getChildShards()).thenReturn(Collections.emptyList());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -203,7 +206,7 @@ public class PrefetchGetRecordsCacheTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testExpiredIteratorException() {
|
||||
public void testExpiredIteratorException() throws Exception{
|
||||
getRecordsCache.start();
|
||||
|
||||
when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class).thenReturn(getRecordsResult);
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ import java.io.File;
|
|||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
|
|
@ -245,7 +246,7 @@ public class ShardConsumerTest {
|
|||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public final void testRecordProcessorThrowable() throws Exception {
|
||||
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
ShardInfo shardInfo = new ShardInfo("s-0-0", UUID.randomUUID().toString(), null, ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
StreamConfig streamConfig =
|
||||
new StreamConfig(streamProxy,
|
||||
1,
|
||||
|
|
@ -271,6 +272,7 @@ public class ShardConsumerTest {
|
|||
|
||||
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
|
||||
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
|
||||
when(streamProxy.getIterator(anyString(), anyString(), anyString())).thenReturn("startingIterator");
|
||||
when(leaseManager.getLease(anyString())).thenReturn(null);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
|
||||
|
|
@ -538,7 +540,7 @@ public class ShardConsumerTest {
|
|||
int numRecs = 10;
|
||||
BigInteger startSeqNum = BigInteger.ONE;
|
||||
String streamShardId = "kinesis-0-0";
|
||||
String testConcurrencyToken = "testToken";
|
||||
String testConcurrencyToken = UUID.randomUUID().toString();
|
||||
List<Shard> shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum);
|
||||
// Close the shard so that shutdown is called with reason terminate
|
||||
shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber(
|
||||
|
|
@ -606,8 +608,7 @@ public class ShardConsumerTest {
|
|||
shardSyncer,
|
||||
shardSyncStrategy);
|
||||
|
||||
when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null));
|
||||
|
||||
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
|
||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
|
||||
consumer.consumeShard(); // check on parent shards
|
||||
Thread.sleep(50L);
|
||||
|
|
@ -657,7 +658,7 @@ public class ShardConsumerTest {
|
|||
}
|
||||
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
|
||||
|
||||
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
|
||||
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE)));
|
||||
|
||||
verify(getRecordsCache).shutdown();
|
||||
|
||||
|
|
@ -681,7 +682,7 @@ public class ShardConsumerTest {
|
|||
int numRecs = 10;
|
||||
BigInteger startSeqNum = BigInteger.ONE;
|
||||
String streamShardId = "kinesis-0-0";
|
||||
String testConcurrencyToken = "testToken";
|
||||
String testConcurrencyToken = UUID.randomUUID().toString();
|
||||
List<Shard> shardList = KinesisLocalFileDataCreator.createShardList(3, "kinesis-0-", startSeqNum);
|
||||
// Close the shard so that shutdown is called with reason terminate
|
||||
shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber(
|
||||
|
|
@ -749,7 +750,12 @@ public class ShardConsumerTest {
|
|||
shardSyncer,
|
||||
shardSyncStrategy);
|
||||
|
||||
when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null));
|
||||
List<String> parentShardIds = new ArrayList<>();
|
||||
parentShardIds.add(shardInfo.getShardId());
|
||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(),
|
||||
"leaseOwner",
|
||||
parentShardIds));
|
||||
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
|
||||
|
||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
|
||||
consumer.consumeShard(); // check on parent shards
|
||||
|
|
@ -939,7 +945,7 @@ public class ShardConsumerTest {
|
|||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public final void testConsumeShardInitializedWithPendingCheckpoint() throws Exception {
|
||||
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
ShardInfo shardInfo = new ShardInfo("s-0-0", UUID.randomUUID().toString(), null, ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
StreamConfig streamConfig =
|
||||
new StreamConfig(streamProxy,
|
||||
1,
|
||||
|
|
@ -967,6 +973,7 @@ public class ShardConsumerTest {
|
|||
|
||||
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
|
||||
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
|
||||
when(streamProxy.getIterator(anyString(), anyString(), anyString())).thenReturn("startingIterator");
|
||||
when(leaseManager.getLease(anyString())).thenReturn(null);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory());
|
||||
|
|
@ -1125,6 +1132,14 @@ public class ShardConsumerTest {
|
|||
return userRecords;
|
||||
}
|
||||
|
||||
private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
|
||||
KinesisClientLease lease = new KinesisClientLease();
|
||||
lease.setLeaseKey(leaseKey);
|
||||
lease.setLeaseOwner(leaseOwner);
|
||||
lease.setParentShardIds(parentShardIds);
|
||||
return lease;
|
||||
}
|
||||
|
||||
Matcher<InitializationInput> initializationInputMatcher(final ExtendedSequenceNumber checkpoint,
|
||||
final ExtendedSequenceNumber pendingCheckpoint) {
|
||||
return new TypeSafeMatcher<InitializationInput>() {
|
||||
|
|
|
|||
|
|
@ -24,11 +24,17 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.services.kinesis.model.HashKeyRange;
|
||||
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
|
|
@ -60,7 +66,7 @@ public class ShutdownTaskTest {
|
|||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
|
||||
|
||||
Set<String> defaultParentShardIds = new HashSet<>();
|
||||
String defaultConcurrencyToken = "testToken4398";
|
||||
String defaultConcurrencyToken = UUID.randomUUID().toString();
|
||||
String defaultShardId = "shardId-0";
|
||||
ShardInfo defaultShardInfo = new ShardInfo(defaultShardId,
|
||||
defaultConcurrencyToken,
|
||||
|
|
@ -70,10 +76,16 @@ public class ShutdownTaskTest {
|
|||
ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
|
||||
|
||||
|
||||
@Mock
|
||||
private IKinesisProxy kinesisProxy;
|
||||
@Mock
|
||||
private GetRecordsCache getRecordsCache;
|
||||
@Mock
|
||||
private ShardSyncStrategy shardSyncStrategy;
|
||||
@Mock
|
||||
private ILeaseManager<KinesisClientLease> leaseManager;
|
||||
@Mock
|
||||
private KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
|
|
@ -95,6 +107,10 @@ public class ShutdownTaskTest {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
doNothing().when(getRecordsCache).shutdown();
|
||||
final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease);
|
||||
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -111,12 +127,6 @@ public class ShutdownTaskTest {
|
|||
public final void testCallWhenApplicationDoesNotCheckpoint() {
|
||||
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
|
||||
when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
|
||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
List<Shard> shards = constructShardListForGraphA();
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
boolean cleanupLeasesOfCompletedShards = false;
|
||||
boolean ignoreUnexpectedChildShards = false;
|
||||
|
|
@ -132,31 +142,29 @@ public class ShutdownTaskTest {
|
|||
TASK_BACKOFF_TIME_MILLIS,
|
||||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy);
|
||||
shardSyncStrategy,
|
||||
constructChildShards());
|
||||
TaskResult result = task.call();
|
||||
Assert.assertNotNull(result.getException());
|
||||
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
|
||||
final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " +
|
||||
"Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information.";
|
||||
Assert.assertEquals(expectedExceptionMessage, result.getException().getMessage());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test method for {@link ShutdownTask#call()}.
|
||||
*/
|
||||
@Test
|
||||
public final void testCallWhenSyncingShardsThrows() {
|
||||
public final void testCallWhenCreatingLeaseThrows() throws Exception {
|
||||
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
|
||||
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||
List<Shard> shards = constructShardListForGraphA();
|
||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
boolean cleanupLeasesOfCompletedShards = false;
|
||||
boolean ignoreUnexpectedChildShards = false;
|
||||
|
||||
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(new KinesisClientLibIOException("")));
|
||||
final String exceptionMessage = "InvalidStateException is thrown.";
|
||||
when(leaseManager.createLeaseIfNotExists(any(KinesisClientLease.class))).thenThrow(new InvalidStateException(exceptionMessage));
|
||||
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||
defaultRecordProcessor,
|
||||
checkpointer,
|
||||
|
|
@ -169,30 +177,21 @@ public class ShutdownTaskTest {
|
|||
TASK_BACKOFF_TIME_MILLIS,
|
||||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy);
|
||||
shardSyncStrategy,
|
||||
constructChildShards());
|
||||
TaskResult result = task.call();
|
||||
verify(shardSyncStrategy).onShardConsumerShutDown(shards);
|
||||
Assert.assertNotNull(result.getException());
|
||||
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
|
||||
verify(getRecordsCache).shutdown();
|
||||
verify(leaseCoordinator).dropLease(any(KinesisClientLease.class));
|
||||
Assert.assertNull(result.getException());
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testCallWhenShardEnd() {
|
||||
public final void testCallWhenShardEnd() throws Exception {
|
||||
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
|
||||
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||
List<Shard> shards = constructShardListForGraphA();
|
||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
boolean cleanupLeasesOfCompletedShards = false;
|
||||
boolean ignoreUnexpectedChildShards = false;
|
||||
|
||||
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
|
||||
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||
defaultRecordProcessor,
|
||||
checkpointer,
|
||||
|
|
@ -205,36 +204,27 @@ public class ShutdownTaskTest {
|
|||
TASK_BACKOFF_TIME_MILLIS,
|
||||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy);
|
||||
shardSyncStrategy,
|
||||
constructChildShards());
|
||||
TaskResult result = task.call();
|
||||
verify(shardSyncStrategy).onShardConsumerShutDown(shards);
|
||||
verify(kinesisProxy, times(1)).verifyShardClosure(anyString());
|
||||
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
|
||||
verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class));
|
||||
Assert.assertNull(result.getException());
|
||||
verify(getRecordsCache).shutdown();
|
||||
verify(leaseCoordinator, never()).dropLease(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testCallWhenFalseShardEnd() {
|
||||
public final void testCallWhenShardNotFound() throws Exception {
|
||||
ShardInfo shardInfo = new ShardInfo("shardId-4",
|
||||
defaultConcurrencyToken,
|
||||
defaultParentShardIds,
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
|
||||
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||
List<Shard> shards = constructShardListForGraphA();
|
||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease());
|
||||
boolean cleanupLeasesOfCompletedShards = false;
|
||||
boolean ignoreUnexpectedChildShards = false;
|
||||
|
||||
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
|
||||
|
||||
ShutdownTask task = new ShutdownTask(shardInfo,
|
||||
defaultRecordProcessor,
|
||||
checkpointer,
|
||||
|
|
@ -247,31 +237,23 @@ public class ShutdownTaskTest {
|
|||
TASK_BACKOFF_TIME_MILLIS,
|
||||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy);
|
||||
shardSyncStrategy,
|
||||
Collections.emptyList());
|
||||
TaskResult result = task.call();
|
||||
verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
|
||||
verify(kinesisProxy, times(1)).verifyShardClosure(anyString());
|
||||
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
|
||||
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
|
||||
Assert.assertNull(result.getException());
|
||||
verify(getRecordsCache).shutdown();
|
||||
verify(leaseCoordinator).dropLease(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testCallWhenLeaseLost() {
|
||||
public final void testCallWhenLeaseLost() throws Exception {
|
||||
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
|
||||
when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
|
||||
List<Shard> shards = constructShardListForGraphA();
|
||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
boolean cleanupLeasesOfCompletedShards = false;
|
||||
boolean ignoreUnexpectedChildShards = false;
|
||||
|
||||
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
|
||||
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||
defaultRecordProcessor,
|
||||
checkpointer,
|
||||
|
|
@ -284,13 +266,13 @@ public class ShutdownTaskTest {
|
|||
TASK_BACKOFF_TIME_MILLIS,
|
||||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy);
|
||||
shardSyncStrategy,
|
||||
Collections.emptyList());
|
||||
TaskResult result = task.call();
|
||||
verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
|
||||
verify(kinesisProxy, never()).getShardList();
|
||||
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
|
||||
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
|
||||
Assert.assertNull(result.getException());
|
||||
verify(getRecordsCache).shutdown();
|
||||
verify(leaseCoordinator, never()).dropLease(any());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -299,10 +281,39 @@ public class ShutdownTaskTest {
|
|||
@Test
|
||||
public final void testGetTaskType() {
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, getRecordsCache, shardSyncer, shardSyncStrategy);
|
||||
ShutdownTask task = new ShutdownTask(null, null, null, null,
|
||||
null, null, false,
|
||||
false, leaseCoordinator, 0,
|
||||
getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList());
|
||||
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
|
||||
}
|
||||
|
||||
private List<ChildShard> constructChildShards() {
|
||||
List<ChildShard> childShards = new ArrayList<>();
|
||||
List<String> parentShards = new ArrayList<>();
|
||||
parentShards.add(defaultShardId);
|
||||
|
||||
ChildShard leftChild = new ChildShard();
|
||||
leftChild.setShardId("ShardId-1");
|
||||
leftChild.setParentShards(parentShards);
|
||||
leftChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"));
|
||||
childShards.add(leftChild);
|
||||
|
||||
ChildShard rightChild = new ChildShard();
|
||||
rightChild.setShardId("ShardId-2");
|
||||
rightChild.setParentShards(parentShards);
|
||||
rightChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"));
|
||||
childShards.add(rightChild);
|
||||
return childShards;
|
||||
}
|
||||
|
||||
private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
|
||||
KinesisClientLease lease = new KinesisClientLease();
|
||||
lease.setLeaseKey(leaseKey);
|
||||
lease.setLeaseOwner(leaseOwner);
|
||||
lease.setParentShardIds(parentShardIds);
|
||||
return lease;
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method to construct a shard list for graph A. Graph A is defined below.
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import java.nio.charset.Charset;
|
|||
import java.nio.charset.CharsetEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
|
@ -33,6 +34,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.services.kinesis.model.ShardFilter;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
|
@ -388,14 +390,33 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
|
|||
*/
|
||||
response.setNextShardIterator(serializeIterator(iterator.shardId, lastRecordsSeqNo.add(BigInteger.ONE)
|
||||
.toString()));
|
||||
response.setChildShards(Collections.emptyList());
|
||||
LOG.debug("Returning a non null iterator for shard " + iterator.shardId);
|
||||
} else {
|
||||
response.setChildShards(constructChildShards(iterator));
|
||||
LOG.info("Returning null iterator for shard " + iterator.shardId);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
private List<ChildShard> constructChildShards(IteratorInfo iterator) {
|
||||
List<ChildShard> childShards = new ArrayList<>();
|
||||
List<String> parentShards = new ArrayList<>();
|
||||
parentShards.add(iterator.shardId);
|
||||
|
||||
ChildShard leftChild = new ChildShard();
|
||||
leftChild.setShardId("ShardId-1");
|
||||
leftChild.setParentShards(parentShards);
|
||||
childShards.add(leftChild);
|
||||
|
||||
ChildShard rightChild = new ChildShard();
|
||||
rightChild.setShardId("ShardId-2");
|
||||
rightChild.setParentShards(parentShards);
|
||||
childShards.add(rightChild);
|
||||
return childShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ public class KinesisClientLeaseBuilder {
|
|||
private ExtendedSequenceNumber pendingCheckpoint;
|
||||
private Long ownerSwitchesSinceCheckpoint = 0L;
|
||||
private Set<String> parentShardIds = new HashSet<>();
|
||||
private Set<String> childShardIds = new HashSet<>();
|
||||
private Set<String> childShardIds = new HashSet<>();
|
||||
private HashKeyRangeForLease hashKeyRangeForLease;
|
||||
|
||||
public KinesisClientLeaseBuilder withLeaseKey(String leaseKey) {
|
||||
|
|
@ -90,7 +90,6 @@ public class KinesisClientLeaseBuilder {
|
|||
|
||||
public KinesisClientLease build() {
|
||||
return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
|
||||
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds,
|
||||
hashKeyRangeForLease);
|
||||
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, hashKeyRangeForLease);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue