diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 47b1239f..fc3400e8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -530,7 +530,8 @@ class ConsumerStates { consumer.isIgnoreUnexpectedChildShards(), consumer.getLeaseCoordinator(), consumer.getTaskBackoffTimeMillis(), - consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy()); + consumer.getGetRecordsCache(), consumer.getShardSyncer(), + consumer.getShardSyncStrategy(), consumer.getChildShards()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index e425e070..c716afa1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -16,7 +16,12 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Collections; import java.util.Date; +import java.util.List; +import java.util.Set; +import com.amazonaws.SdkClientException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.model.ChildShard; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +52,7 @@ class KinesisDataFetcher { private boolean isInitialized; private String lastKnownSequenceNumber; private InitialPositionInStreamExtended initialPositionInStream; + private List childShards = Collections.emptyList(); /** * @@ -85,8 +91,11 @@ class KinesisDataFetcher { final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() { @Override public GetRecordsResult getResult() { - return new GetRecordsResult().withMillisBehindLatest(null).withRecords(Collections.emptyList()) - .withNextShardIterator(null); + return new GetRecordsResult() + .withMillisBehindLatest(null) + .withRecords(Collections.emptyList()) + .withNextShardIterator(null) + .withChildShards(Collections.emptyList()); } @Override @@ -113,12 +122,20 @@ class KinesisDataFetcher { @Override public GetRecordsResult accept() { + if (!isValidResult(result)) { + // Throwing SDK exception when the GetRecords result is not valid. This will allow PrefetchGetRecordsCache to retry the GetRecords call. + throw new SdkClientException("Shard " + shardId +": GetRecordsResult is not valid. NextShardIterator: " + result.getNextShardIterator() + + ". ChildShards: " + result.getChildShards()); + } nextIterator = result.getNextShardIterator(); if (!CollectionUtils.isNullOrEmpty(result.getRecords())) { lastKnownSequenceNumber = Iterables.getLast(result.getRecords()).getSequenceNumber(); } if (nextIterator == null) { - LOG.info("Reached shard end: nextIterator is null in AdvancingResult.accept for shard " + shardId); + LOG.info("Reached shard end: nextIterator is null in AdvancingResult.accept for shard " + shardId + ". childShards: " + result.getChildShards()); + if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) { + childShards = result.getChildShards(); + } isShardEndReached = true; } return getResult(); @@ -130,6 +147,23 @@ class KinesisDataFetcher { } } + private boolean isValidResult(GetRecordsResult getRecordsResult) { + // GetRecords result should contain childShard information. There are two valid combination for the nextShardIterator and childShards + // If the GetRecords call does not reach the shard end, getRecords result should contain a non-null nextShardIterator and an empty list of childShards. + // If the GetRecords call reaches the shard end, getRecords result should contain a null nextShardIterator and a non-empty list of childShards. + // All other combinations are invalid and indicating an issue with GetRecords result from Kinesis service. + if (getRecordsResult.getNextShardIterator() == null && CollectionUtils.isNullOrEmpty(getRecordsResult.getChildShards()) || + getRecordsResult.getNextShardIterator() != null && !CollectionUtils.isNullOrEmpty(getRecordsResult.getChildShards())) { + return false; + } + for (ChildShard childShard : getRecordsResult.getChildShards()) { + if (CollectionUtils.isNullOrEmpty(childShard.getParentShards())) { + return false; + } + } + return true; + } + /** * Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number. * @param initialCheckpoint Current checkpoint sequence number for this shard. @@ -141,8 +175,7 @@ class KinesisDataFetcher { isInitialized = true; } - public void initialize(ExtendedSequenceNumber initialCheckpoint, - InitialPositionInStreamExtended initialPositionInStream) { + public void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream) { LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint.getSequenceNumber()); advanceIteratorTo(initialCheckpoint.getSequenceNumber(), initialPositionInStream); isInitialized = true; @@ -171,6 +204,7 @@ class KinesisDataFetcher { if (nextIterator == null) { LOG.info("Reached shard end: cannot advance iterator for shard " + shardId); isShardEndReached = true; + // TODO: transition to ShuttingDown state on shardend instead to shutdown state for enqueueing this for cleanup } this.lastKnownSequenceNumber = sequenceNumber; this.initialPositionInStream = initialPositionInStream; @@ -248,6 +282,10 @@ class KinesisDataFetcher { return isShardEndReached; } + protected List getChildShards() { + return childShards; + } + /** Note: This method has package level access for testing purposes. * @return nextIterator */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index 358c8136..08543230 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -28,6 +28,7 @@ import java.util.Set; import com.amazonaws.services.kinesis.leases.impl.Lease; import com.amazonaws.services.kinesis.leases.impl.LeaseManager; +import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.services.kinesis.model.ShardFilterType; import com.amazonaws.util.CollectionUtils; @@ -779,6 +780,29 @@ class KinesisShardSyncer implements ShardSyncer { return newLease; } + /** + * Helper method to create a new KinesisClientLease POJO for a ChildShard. + * Note: Package level access only for testing purposes + * + * @param childShard + * @return + */ + static KinesisClientLease newKCLLeaseForChildShard(ChildShard childShard) throws InvalidStateException { + final KinesisClientLease newLease = new KinesisClientLease(); + newLease.setLeaseKey(childShard.getShardId()); + final List parentShardIds = new ArrayList<>(); + if (!CollectionUtils.isNullOrEmpty(childShard.getParentShards())) { + parentShardIds.addAll(childShard.getParentShards()); + } else { + throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.getShardId() + + " because parent shards cannot be found."); + } + newLease.setParentShardIds(parentShardIds); + newLease.setOwnerSwitchesSinceCheckpoint(0L); + newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return newLease; + } + /** * Helper method to construct a shardId->Shard map for the specified list of shards. * diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 8173a479..a4cf74d8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -129,6 +129,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { try { result = getRecordsResultQueue.take().withCacheExitTime(Instant.now()); prefetchCounters.removed(result); + log.info("Shard " + shardId + ": Number of records remaining in queue is " + getRecordsResultQueue.size()); } catch (InterruptedException e) { log.error("Interrupted while getting records from the cache", e); } @@ -177,7 +178,6 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count, MetricsLevel.SUMMARY); - dataFetcher.restartIterator(); } catch (SdkClientException e) { log.error("Exception thrown while fetching records from Kinesis", e); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index b578fbb0..cd543e23 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -152,8 +152,8 @@ class ProcessTask implements ITask { try { if (dataFetcher.isShardEndReached()) { - LOG.info("Reached end of shard " + shardInfo.getShardId()); - return new TaskResult(null, true); + LOG.info("Reached end of shard " + shardInfo.getShardId() + ". Found childShards: " + dataFetcher.getChildShards()); + return new TaskResult(null, true, dataFetcher.getChildShards()); } final ProcessRecordsInput processRecordsInput = getRecordsResult(); @@ -353,7 +353,7 @@ class ProcessTask implements ITask { * recordProcessorCheckpointer). */ dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue() - .getSequenceNumber(), streamConfig.getInitialPositionInStream()); + .getSequenceNumber(), streamConfig.getInitialPositionInStream()); // Try a second time - if we fail this time, expose the failure. try { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index a30412ce..f5513d3e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -15,11 +15,14 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import com.amazonaws.services.kinesis.model.ChildShard; +import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,6 +69,9 @@ class ShardConsumer { private Future future; private ShardSyncStrategy shardSyncStrategy; + @Getter + private List childShards; + @Getter private final GetRecordsCache getRecordsCache; @@ -321,6 +327,10 @@ class ShardConsumer { TaskResult result = future.get(); if (result.getException() == null) { if (result.isShardEndReached()) { + if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) { + childShards = result.getChildShards(); + LOG.info("Shard " + shardInfo.getShardId() + ": Setting childShards in ShardConsumer: " + childShards); + } return TaskOutcome.END_OF_SHARD; } return TaskOutcome.SUCCESSFUL; @@ -420,6 +430,7 @@ class ShardConsumer { void updateState(TaskOutcome taskOutcome) { if (taskOutcome == TaskOutcome.END_OF_SHARD) { markForShutdown(ShutdownReason.TERMINATE); + LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE"); } if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index a9ff5080..71cf3b9d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -14,9 +14,11 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; -import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; -import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.model.ChildShard; +import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +32,9 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; /** * Task for invoking the RecordProcessor shutdown() callback. @@ -54,6 +59,7 @@ class ShutdownTask implements ITask { private final GetRecordsCache getRecordsCache; private final ShardSyncer shardSyncer; private final ShardSyncStrategy shardSyncStrategy; + private final List childShards; /** * Constructor. @@ -69,7 +75,8 @@ class ShutdownTask implements ITask { boolean ignoreUnexpectedChildShards, KinesisClientLibLeaseCoordinator leaseCoordinator, long backoffTimeMillis, - GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, + ShardSyncStrategy shardSyncStrategy, List childShards) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -83,6 +90,7 @@ class ShutdownTask implements ITask { this.getRecordsCache = getRecordsCache; this.shardSyncer = shardSyncer; this.shardSyncStrategy = shardSyncStrategy; + this.childShards = childShards; } /* @@ -97,29 +105,39 @@ class ShutdownTask implements ITask { boolean applicationException = false; try { + LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " + + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); ShutdownReason localReason = reason; - List latestShards = null; /* * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active * workers to contend for the lease of this shard. */ if(localReason == ShutdownReason.TERMINATE) { - ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId()); - if (shardClosureVerificationResponse instanceof ShardListWrappingShardClosureVerificationResponse) { - latestShards = ((ShardListWrappingShardClosureVerificationResponse)shardClosureVerificationResponse).getLatestShards(); - } - - // If shard in context is not closed yet we should shut down the ShardConsumer with Zombie state - // which avoids checkpoint-ing with SHARD_END sequence number. - if(!shardClosureVerificationResponse.isShardClosed()) { + // Create new lease for the child shards if they don't exist. + // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. + // This would happen when KinesisDataFetcher catches ResourceNotFound exception. + // In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. + // This scenario could happen when customer deletes the stream while leaving the KCL application running. + try { + if (!CollectionUtils.isNullOrEmpty(childShards)) { + createLeasesForChildShardsIfNotExist(); + updateCurrentLeaseWithChildShards(); + } else { + LOG.warn("Shard " + shardInfo.getShardId() + + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); + } + } catch (InvalidStateException e) { + // If invalidStateException happens, it indicates we are missing childShard related information. + // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting + // childShard information in the processTask. localReason = ShutdownReason.ZOMBIE; dropLease(); - LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId()); + LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " + + "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); } } - // If we reached end of the shard, set sequence number to SHARD_END. if (localReason == ShutdownReason.TERMINATE) { recordProcessorCheckpointer.setSequenceNumberAtShardEnd( @@ -127,8 +145,6 @@ class ShutdownTask implements ITask { recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); } - LOG.debug("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken " - + shardInfo.getConcurrencyToken() + ". Shutdown reason: " + localReason); final ShutdownInput shutdownInput = new ShutdownInput() .withShutdownReason(localReason) .withCheckpointer(recordProcessorCheckpointer); @@ -156,18 +172,6 @@ class ShutdownTask implements ITask { MetricsLevel.SUMMARY); } - if (localReason == ShutdownReason.TERMINATE) { - LOG.debug("Looking for child shards of shard " + shardInfo.getShardId()); - // create leases for the child shards - TaskResult result = shardSyncStrategy.onShardConsumerShutDown(latestShards); - if (result.getException() != null) { - LOG.debug("Exception while trying to sync shards on the shutdown of shard: " + shardInfo - .getShardId()); - throw result.getException(); - } - LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId()); - } - return new TaskResult(null); } catch (Exception e) { if (applicationException) { @@ -187,6 +191,33 @@ class ShutdownTask implements ITask { return new TaskResult(exception); } + private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException { + for (ChildShard childShard : childShards) { + final String leaseKey = childShard.getShardId(); + if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) { + final KinesisClientLease leaseToCreate = KinesisShardSyncer.newKCLLeaseForChildShard(childShard); + leaseCoordinator.getLeaseManager().createLeaseIfNotExists(leaseToCreate); + LOG.info("Shard " + shardInfo.getShardId() + " : Created child shard lease: " + leaseToCreate.getLeaseKey()); + } + } + } + + private void updateCurrentLeaseWithChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + if (currentLease == null) { + throw new InvalidStateException("Failed to retrieve current lease for shard " + shardInfo.getShardId()); + } + final Set childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet()); + + currentLease.setChildShardIds(childShardIds); + final boolean updateResult = leaseCoordinator.updateLease(currentLease, UUID.fromString(shardInfo.getConcurrencyToken())); + if (!updateResult) { + throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.getShardId()); + } + LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey()); + } + + /* * (non-Javadoc) * @@ -204,6 +235,10 @@ class ShutdownTask implements ITask { private void dropLease() { KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + if (lease == null) { + LOG.warn("Shard " + shardInfo.getShardId() + ": Lease already dropped. Will shutdown the shardConsumer directly."); + return; + } leaseCoordinator.dropLease(lease); LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey()); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java index bc68d292..70109b86 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java @@ -14,6 +14,10 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.model.ChildShard; + +import java.util.List; + /** * Used to capture information from a task that we want to communicate back to the higher layer. * E.g. exception thrown when executing the task, if we reach end of a shard. @@ -26,6 +30,9 @@ class TaskResult { // Any exception caught while executing the task. private Exception exception; + // List of childShards of the current shard. This field is only required for the task result when we reach end of a shard. + private List childShards; + /** * @return the shardEndReached */ @@ -33,6 +40,11 @@ class TaskResult { return shardEndReached; } + /** + * @return the list of childShards. + */ + protected List getChildShards() { return childShards; } + /** * @param shardEndReached the shardEndReached to set */ @@ -40,6 +52,11 @@ class TaskResult { this.shardEndReached = shardEndReached; } + /** + * @param childShards the list of childShards to set + */ + protected void setChildShards(List childShards) { this.childShards = childShards; } + /** * @return the exception */ @@ -70,4 +87,10 @@ class TaskResult { this.shardEndReached = isShardEndReached; } + TaskResult(Exception e, boolean isShardEndReached, List childShards) { + this.exception = e; + this.shardEndReached = isShardEndReached; + this.childShards = childShards; + } + } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index ae58fb10..259b4c2f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -30,9 +30,10 @@ public class KinesisClientLease extends Lease { private ExtendedSequenceNumber pendingCheckpoint; private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet(); - private Set childShardIds = new HashSet(); + private Set childShardIds = new HashSet<>(); private HashKeyRangeForLease hashKeyRangeForLease; + public KinesisClientLease() { } @@ -43,7 +44,7 @@ public class KinesisClientLease extends Lease { this.pendingCheckpoint = other.getPendingCheckpoint(); this.ownerSwitchesSinceCheckpoint = other.getOwnerSwitchesSinceCheckpoint(); this.parentShardIds.addAll(other.getParentShardIds()); - this.childShardIds = other.getChildShardIds(); + this.childShardIds.addAll(other.getChildShardIds()); this.hashKeyRangeForLease = other.getHashKeyRange(); } @@ -76,6 +77,7 @@ public class KinesisClientLease extends Lease { setCheckpoint(casted.checkpoint); setPendingCheckpoint(casted.pendingCheckpoint); setParentShardIds(casted.parentShardIds); + setChildShardIds(casted.childShardIds); } /** @@ -108,7 +110,7 @@ public class KinesisClientLease extends Lease { } /** - * @return shardIds that are the children of this lease. Used for resharding. + * @return shardIds for the child shards of the current shard. Used for resharding. */ public Set getChildShardIds() { return new HashSet(childShardIds); @@ -170,9 +172,6 @@ public class KinesisClientLease extends Lease { * @param childShardIds may not be null */ public void setChildShardIds(Collection childShardIds) { - verifyNotNull(childShardIds, "childShardIds should not be null"); - - this.childShardIds.clear(); this.childShardIds.addAll(childShardIds); } @@ -186,7 +185,7 @@ public class KinesisClientLease extends Lease { this.hashKeyRangeForLease = hashKeyRangeForLease; } - + private void verifyNotNull(Object object, String message) { if (object == null) { throw new IllegalArgumentException(message); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 6bf9bc58..0b9271be 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -43,7 +43,7 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer recordsA = new ArrayList(); outputA.setRecords(recordsA); + outputA.setNextShardIterator("nextShardIteratorA"); + outputA.setChildShards(Collections.emptyList()); GetRecordsResult outputB = new GetRecordsResult(); List recordsB = new ArrayList(); outputB.setRecords(recordsB); + outputB.setNextShardIterator("nextShardIteratorB"); + outputB.setChildShards(Collections.emptyList()); when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqA)).thenReturn(iteratorA); when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqB)).thenReturn(iteratorB); @@ -166,7 +171,7 @@ public class KinesisDataFetcherTest { } @Test - public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() { + public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws Exception{ IKinesisProxy kinesis = mock(IKinesisProxy.class); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); @@ -189,7 +194,7 @@ public class KinesisDataFetcherTest { } @Test - public void testGetRecordsWithResourceNotFoundException() { + public void testGetRecordsWithResourceNotFoundException() throws Exception { // Set up arguments used by proxy String nextIterator = "TestShardIterator"; int maxRecords = 100; @@ -211,11 +216,12 @@ public class KinesisDataFetcherTest { } @Test - public void testNonNullGetRecords() { + public void testNonNullGetRecords() throws Exception { String nextIterator = "TestIterator"; int maxRecords = 100; KinesisProxy mockProxy = mock(KinesisProxy.class); + when(mockProxy.getIterator(anyString(), anyString())).thenReturn("targetIterator"); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); @@ -232,17 +238,25 @@ public class KinesisDataFetcherTest { final String NEXT_ITERATOR_ONE = "NextIteratorOne"; final String NEXT_ITERATOR_TWO = "NextIteratorTwo"; when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR); - GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class); - when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE); + + GetRecordsResult iteratorOneResults = new GetRecordsResult(); + iteratorOneResults.setNextShardIterator(NEXT_ITERATOR_ONE); + iteratorOneResults.setChildShards(Collections.emptyList()); when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults); - GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class); + GetRecordsResult iteratorTwoResults = new GetRecordsResult(); + iteratorTwoResults.setNextShardIterator(NEXT_ITERATOR_TWO); + iteratorTwoResults.setChildShards(Collections.emptyList()); when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults); - when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO); - GetRecordsResult finalResult = mock(GetRecordsResult.class); + GetRecordsResult finalResult = new GetRecordsResult(); + finalResult.setNextShardIterator(null); + List childShards = new ArrayList<>(); + ChildShard childShard = new ChildShard(); + childShard.setParentShards(Collections.singletonList("parentShardId")); + childShards.add(childShard); + finalResult.setChildShards(childShards); when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult); - when(finalResult.getNextShardIterator()).thenReturn(null); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO); dataFetcher.initialize("TRIM_HORIZON", @@ -276,13 +290,14 @@ public class KinesisDataFetcherTest { } @Test - public void testRestartIterator() { + public void testRestartIterator() throws Exception{ GetRecordsResult getRecordsResult = mock(GetRecordsResult.class); - GetRecordsResult restartGetRecordsResult = new GetRecordsResult(); + GetRecordsResult restartGetRecordsResult = mock(GetRecordsResult.class); Record record = mock(Record.class); final String initialIterator = "InitialIterator"; final String nextShardIterator = "NextShardIterator"; final String restartShardIterator = "RestartIterator"; + final String restartNextShardIterator = "RestartNextIterator"; final String sequenceNumber = "SequenceNumber"; final String iteratorType = "AT_SEQUENCE_NUMBER"; KinesisProxy kinesisProxy = mock(KinesisProxy.class); @@ -292,6 +307,7 @@ public class KinesisDataFetcherTest { when(kinesisProxy.get(eq(initialIterator), eq(10))).thenReturn(getRecordsResult); when(getRecordsResult.getRecords()).thenReturn(Collections.singletonList(record)); when(getRecordsResult.getNextShardIterator()).thenReturn(nextShardIterator); + when(getRecordsResult.getChildShards()).thenReturn(Collections.emptyList()); when(record.getSequenceNumber()).thenReturn(sequenceNumber); fetcher.initialize(InitialPositionInStream.LATEST.toString(), INITIAL_POSITION_LATEST); @@ -300,6 +316,8 @@ public class KinesisDataFetcherTest { verify(kinesisProxy).get(eq(initialIterator), eq(10)); when(kinesisProxy.getIterator(eq(SHARD_ID), eq(iteratorType), eq(sequenceNumber))).thenReturn(restartShardIterator); + when(restartGetRecordsResult.getNextShardIterator()).thenReturn(restartNextShardIterator); + when(restartGetRecordsResult.getChildShards()).thenReturn(Collections.emptyList()); when(kinesisProxy.get(eq(restartShardIterator), eq(10))).thenReturn(restartGetRecordsResult); fetcher.restartIterator(); @@ -309,7 +327,7 @@ public class KinesisDataFetcherTest { } @Test (expected = IllegalStateException.class) - public void testRestartIteratorNotInitialized() { + public void testRestartIteratorNotInitialized() throws Exception { KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO); dataFetcher.restartIterator(); } @@ -354,6 +372,8 @@ public class KinesisDataFetcherTest { List expectedRecords = new ArrayList(); GetRecordsResult response = new GetRecordsResult(); response.setRecords(expectedRecords); + response.setNextShardIterator("testNextShardIterator"); + response.setChildShards(Collections.emptyList()); when(kinesis.getIterator(SHARD_ID, initialPositionInStream.getTimestamp())).thenReturn(iterator); when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqNo)).thenReturn(iterator); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java index f77d3a9c..8043e0bf 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -74,6 +75,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { private IKinesisProxy proxy; @Mock private ShardInfo shardInfo; + @Mock + private KinesisClientLibLeaseCoordinator leaseCoordinator; @Before public void setup() { @@ -171,7 +174,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { } @Test - public void testExpiredIteratorException() { + public void testExpiredIteratorException() throws Exception { when(dataFetcher.getRecords(eq(MAX_RECORDS_PER_CALL))).thenAnswer(new Answer() { @Override public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws Throwable { @@ -215,6 +218,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { GetRecordsResult getRecordsResult = new GetRecordsResult(); getRecordsResult.setRecords(new ArrayList<>(records)); getRecordsResult.setMillisBehindLatest(1000L); + getRecordsResult.setNextShardIterator("testNextShardIterator"); + getRecordsResult.setChildShards(Collections.emptyList()); return new AdvancingResult(getRecordsResult); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java index a4336aad..b6d7769e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -98,6 +99,8 @@ public class PrefetchGetRecordsCacheTest { when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResult); when(getRecordsResult.getRecords()).thenReturn(records); + when(getRecordsResult.getNextShardIterator()).thenReturn("testNextShardIterator"); + when(getRecordsResult.getChildShards()).thenReturn(Collections.emptyList()); } @Test @@ -203,7 +206,7 @@ public class PrefetchGetRecordsCacheTest { } @Test - public void testExpiredIteratorException() { + public void testExpiredIteratorException() throws Exception{ getRecordsCache.start(); when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class).thenReturn(getRecordsResult); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 1cf86c4f..f040c6a6 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -39,6 +39,7 @@ import java.io.File; import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.ListIterator; @@ -245,7 +246,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception { - ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); + ShardInfo shardInfo = new ShardInfo("s-0-0", UUID.randomUUID().toString(), null, ExtendedSequenceNumber.TRIM_HORIZON); StreamConfig streamConfig = new StreamConfig(streamProxy, 1, @@ -271,6 +272,7 @@ public class ShardConsumerTest { final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; + when(streamProxy.getIterator(anyString(), anyString(), anyString())).thenReturn("startingIterator"); when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(checkpoint.getCheckpointObject(anyString())).thenReturn( @@ -538,7 +540,7 @@ public class ShardConsumerTest { int numRecs = 10; BigInteger startSeqNum = BigInteger.ONE; String streamShardId = "kinesis-0-0"; - String testConcurrencyToken = "testToken"; + String testConcurrencyToken = UUID.randomUUID().toString(); List shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum); // Close the shard so that shutdown is called with reason terminate shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber( @@ -606,8 +608,7 @@ public class ShardConsumerTest { shardSyncer, shardSyncStrategy); - when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null)); - + when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); @@ -657,7 +658,7 @@ public class ShardConsumerTest { } assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); - assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); + assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); verify(getRecordsCache).shutdown(); @@ -681,7 +682,7 @@ public class ShardConsumerTest { int numRecs = 10; BigInteger startSeqNum = BigInteger.ONE; String streamShardId = "kinesis-0-0"; - String testConcurrencyToken = "testToken"; + String testConcurrencyToken = UUID.randomUUID().toString(); List shardList = KinesisLocalFileDataCreator.createShardList(3, "kinesis-0-", startSeqNum); // Close the shard so that shutdown is called with reason terminate shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber( @@ -749,7 +750,12 @@ public class ShardConsumerTest { shardSyncer, shardSyncStrategy); - when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null)); + List parentShardIds = new ArrayList<>(); + parentShardIds.add(shardInfo.getShardId()); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(), + "leaseOwner", + parentShardIds)); + when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -939,7 +945,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testConsumeShardInitializedWithPendingCheckpoint() throws Exception { - ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); + ShardInfo shardInfo = new ShardInfo("s-0-0", UUID.randomUUID().toString(), null, ExtendedSequenceNumber.TRIM_HORIZON); StreamConfig streamConfig = new StreamConfig(streamProxy, 1, @@ -967,6 +973,7 @@ public class ShardConsumerTest { final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); + when(streamProxy.getIterator(anyString(), anyString(), anyString())).thenReturn("startingIterator"); when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory()); @@ -1125,6 +1132,14 @@ public class ShardConsumerTest { return userRecords; } + private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection parentShardIds) { + KinesisClientLease lease = new KinesisClientLease(); + lease.setLeaseKey(leaseKey); + lease.setLeaseOwner(leaseOwner); + lease.setParentShardIds(parentShardIds); + return lease; + } + Matcher initializationInputMatcher(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) { return new TypeSafeMatcher() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 04fadd88..cbfdf54a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -24,11 +24,17 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; @@ -60,7 +66,7 @@ public class ShutdownTaskTest { InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); Set defaultParentShardIds = new HashSet<>(); - String defaultConcurrencyToken = "testToken4398"; + String defaultConcurrencyToken = UUID.randomUUID().toString(); String defaultShardId = "shardId-0"; ShardInfo defaultShardInfo = new ShardInfo(defaultShardId, defaultConcurrencyToken, @@ -70,10 +76,16 @@ public class ShutdownTaskTest { ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); + @Mock + private IKinesisProxy kinesisProxy; @Mock private GetRecordsCache getRecordsCache; @Mock private ShardSyncStrategy shardSyncStrategy; + @Mock + private ILeaseManager leaseManager; + @Mock + private KinesisClientLibLeaseCoordinator leaseCoordinator; /** * @throws java.lang.Exception @@ -95,6 +107,10 @@ public class ShutdownTaskTest { @Before public void setUp() throws Exception { doNothing().when(getRecordsCache).shutdown(); + final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease); + when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); } /** @@ -111,12 +127,6 @@ public class ShutdownTaskTest { public final void testCallWhenApplicationDoesNotCheckpoint() { RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); - IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); - List shards = constructShardListForGraphA(); - when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); - KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); - ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; @@ -132,31 +142,29 @@ public class ShutdownTaskTest { TASK_BACKOFF_TIME_MILLIS, getRecordsCache, shardSyncer, - shardSyncStrategy); + shardSyncStrategy, + constructChildShards()); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); + final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " + + "Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information."; + Assert.assertEquals(expectedExceptionMessage, result.getException().getMessage()); } /** * Test method for {@link ShutdownTask#call()}. */ @Test - public final void testCallWhenSyncingShardsThrows() { + public final void testCallWhenCreatingLeaseThrows() throws Exception { RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - List shards = constructShardListForGraphA(); - IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); - when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); - KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); - ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); - when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(new KinesisClientLibIOException(""))); + final String exceptionMessage = "InvalidStateException is thrown."; + when(leaseManager.createLeaseIfNotExists(any(KinesisClientLease.class))).thenThrow(new InvalidStateException(exceptionMessage)); ShutdownTask task = new ShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, @@ -169,30 +177,21 @@ public class ShutdownTaskTest { TASK_BACKOFF_TIME_MILLIS, getRecordsCache, shardSyncer, - shardSyncStrategy); + shardSyncStrategy, + constructChildShards()); TaskResult result = task.call(); - verify(shardSyncStrategy).onShardConsumerShutDown(shards); - Assert.assertNotNull(result.getException()); - Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); verify(getRecordsCache).shutdown(); + verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); + Assert.assertNull(result.getException()); } @Test - public final void testCallWhenShardEnd() { + public final void testCallWhenShardEnd() throws Exception { RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - List shards = constructShardListForGraphA(); - IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); - when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); - KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); - ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); - when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); - when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null)); ShutdownTask task = new ShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, @@ -205,36 +204,27 @@ public class ShutdownTaskTest { TASK_BACKOFF_TIME_MILLIS, getRecordsCache, shardSyncer, - shardSyncStrategy); + shardSyncStrategy, + constructChildShards()); TaskResult result = task.call(); - verify(shardSyncStrategy).onShardConsumerShutDown(shards); - verify(kinesisProxy, times(1)).verifyShardClosure(anyString()); + verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); + verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class)); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); - verify(leaseCoordinator, never()).dropLease(any()); } @Test - public final void testCallWhenFalseShardEnd() { + public final void testCallWhenShardNotFound() throws Exception { ShardInfo shardInfo = new ShardInfo("shardId-4", defaultConcurrencyToken, defaultParentShardIds, ExtendedSequenceNumber.LATEST); RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - List shards = constructShardListForGraphA(); - IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); - when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards)); - KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); - ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease()); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null)); - ShutdownTask task = new ShutdownTask(shardInfo, defaultRecordProcessor, checkpointer, @@ -247,31 +237,23 @@ public class ShutdownTaskTest { TASK_BACKOFF_TIME_MILLIS, getRecordsCache, shardSyncer, - shardSyncStrategy); + shardSyncStrategy, + Collections.emptyList()); TaskResult result = task.call(); - verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards); - verify(kinesisProxy, times(1)).verifyShardClosure(anyString()); + verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); + verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); - verify(leaseCoordinator).dropLease(any()); } @Test - public final void testCallWhenLeaseLost() { + public final void testCallWhenLeaseLost() throws Exception { RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); - List shards = constructShardListForGraphA(); - IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); - when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards)); - KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); - ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); - when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null)); ShutdownTask task = new ShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, @@ -284,13 +266,13 @@ public class ShutdownTaskTest { TASK_BACKOFF_TIME_MILLIS, getRecordsCache, shardSyncer, - shardSyncStrategy); + shardSyncStrategy, + Collections.emptyList()); TaskResult result = task.call(); - verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards); - verify(kinesisProxy, never()).getShardList(); + verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); + verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); - verify(leaseCoordinator, never()).dropLease(any()); } /** @@ -299,10 +281,39 @@ public class ShutdownTaskTest { @Test public final void testGetTaskType() { KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, getRecordsCache, shardSyncer, shardSyncStrategy); + ShutdownTask task = new ShutdownTask(null, null, null, null, + null, null, false, + false, leaseCoordinator, 0, + getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList()); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } + private List constructChildShards() { + List childShards = new ArrayList<>(); + List parentShards = new ArrayList<>(); + parentShards.add(defaultShardId); + + ChildShard leftChild = new ChildShard(); + leftChild.setShardId("ShardId-1"); + leftChild.setParentShards(parentShards); + leftChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49")); + childShards.add(leftChild); + + ChildShard rightChild = new ChildShard(); + rightChild.setShardId("ShardId-2"); + rightChild.setParentShards(parentShards); + rightChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99")); + childShards.add(rightChild); + return childShards; + } + + private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection parentShardIds) { + KinesisClientLease lease = new KinesisClientLease(); + lease.setLeaseKey(leaseKey); + lease.setLeaseOwner(leaseOwner); + lease.setParentShardIds(parentShardIds); + return lease; + } /* * Helper method to construct a shard list for graph A. Graph A is defined below. diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index ed57eedf..9ade05bd 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -25,6 +25,7 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -33,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -388,14 +390,33 @@ public class KinesisLocalFileProxy implements IKinesisProxy { */ response.setNextShardIterator(serializeIterator(iterator.shardId, lastRecordsSeqNo.add(BigInteger.ONE) .toString())); + response.setChildShards(Collections.emptyList()); LOG.debug("Returning a non null iterator for shard " + iterator.shardId); } else { + response.setChildShards(constructChildShards(iterator)); LOG.info("Returning null iterator for shard " + iterator.shardId); } return response; } + private List constructChildShards(IteratorInfo iterator) { + List childShards = new ArrayList<>(); + List parentShards = new ArrayList<>(); + parentShards.add(iterator.shardId); + + ChildShard leftChild = new ChildShard(); + leftChild.setShardId("ShardId-1"); + leftChild.setParentShards(parentShards); + childShards.add(leftChild); + + ChildShard rightChild = new ChildShard(); + rightChild.setShardId("ShardId-2"); + rightChild.setParentShards(parentShards); + childShards.add(rightChild); + return childShards; + } + /** * {@inheritDoc} */ diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java index 4f4fdbca..d562a639 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java @@ -30,7 +30,7 @@ public class KinesisClientLeaseBuilder { private ExtendedSequenceNumber pendingCheckpoint; private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); - private Set childShardIds = new HashSet<>(); + private Set childShardIds = new HashSet<>(); private HashKeyRangeForLease hashKeyRangeForLease; public KinesisClientLeaseBuilder withLeaseKey(String leaseKey) { @@ -90,7 +90,6 @@ public class KinesisClientLeaseBuilder { public KinesisClientLease build() { return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, - checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, - hashKeyRangeForLease); + checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, hashKeyRangeForLease); } } \ No newline at end of file