Shard End fix for KCL v1.x (#623)

* KCL shardend fix for V1

* Address Comments

* Address more comments

* Force lease lost before shutting down ShardConsumer with Zombie state

* Updating version

* Addressing comments

* Addressing comments

* Fixing unit test

* Addressing comments

* Adding default implementation for onShardConsumerShutDown in ShardSyncStrategy interface

* Method name changes

* Addressing comments

* Addressing comments

* Addressing comments

* Revert the access change for getShardList method
This commit is contained in:
ychunxue 2019-10-17 15:14:29 -07:00 committed by Joshua Kim
parent cc8aa5ef74
commit ffb34b6d96
16 changed files with 556 additions and 72 deletions

View file

@ -528,7 +528,7 @@ class ConsumerStates {
consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(),
consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseManager(),
consumer.getLeaseCoordinator(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy());
}

View file

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang3.StringUtils;
@ -79,14 +80,38 @@ class KinesisShardSyncer implements ShardSyncer {
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
@Override
public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}
/**
* Check and create leases for any new shards (e.g. following a reshard operation).
*
* @param kinesisProxy
* @param leaseManager
* @param initialPositionInStream
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param latestShards latest snapshot of shards to reuse
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
@Override
public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List<Shard> latestShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards);
}
/**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
*
@ -100,14 +125,42 @@ class KinesisShardSyncer implements ShardSyncer {
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
// CHECKSTYLE:OFF CyclomaticComplexity
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
List<Shard> shards = getShardList(kinesisProxy);
LOG.debug("Num shards: " + shards.size());
List<Shard> latestShards = getShardList(kinesisProxy);
syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards);
}
/**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
*
* @param kinesisProxy
* @param leaseManager
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param latestShards latest snapshot of shards to reuse
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
// CHECKSTYLE:OFF CyclomaticComplexity
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List<Shard> latestShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
List<Shard> shards;
if(CollectionUtils.isNullOrEmpty(latestShards)) {
shards = getShardList(kinesisProxy);
} else {
shards = latestShards;
}
LOG.debug("Num Shards: " + shards.size());
Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);

View file

@ -1,5 +1,6 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* An implementation of ShardSyncStrategy.
*/

View file

@ -50,7 +50,7 @@ class ShardConsumer {
private final ShardInfo shardInfo;
private final KinesisDataFetcher dataFetcher;
private final IMetricsFactory metricsFactory;
private final ILeaseManager<KinesisClientLease> leaseManager;
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private ICheckpoint checkpoint;
// Backoff time when polling to check if application has finished processing parent shards
private final long parentShardPollIntervalMillis;
@ -98,7 +98,7 @@ class ShardConsumer {
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param config Kinesis library configuration
* @param leaseManager Used to create leases for new shards
* @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
@ -110,7 +110,7 @@ class ShardConsumer {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
ILeaseManager<KinesisClientLease> leaseManager,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
@ -122,7 +122,7 @@ class ShardConsumer {
streamConfig,
checkpoint,
recordProcessor,
leaseManager,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -139,7 +139,7 @@ class ShardConsumer {
* @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param leaseManager Used to create leases for new shards
* @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
@ -154,7 +154,7 @@ class ShardConsumer {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
ILeaseManager<KinesisClientLease> leaseManager,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
@ -177,7 +177,7 @@ class ShardConsumer {
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
metricsFactory),
leaseManager,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -197,7 +197,7 @@ class ShardConsumer {
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress
* @param leaseManager Used to create leases for new shards
* @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param cleanupLeasesOfCompletedShards clean up the leases of completed shards
* @param executorService ExecutorService used to execute process tasks for this shard
@ -215,7 +215,7 @@ class ShardConsumer {
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ILeaseManager<KinesisClientLease> leaseManager,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
@ -231,7 +231,7 @@ class ShardConsumer {
this.checkpoint = checkpoint;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.leaseManager = leaseManager;
this.leaseCoordinator = leaseCoordinator;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.executorService = executorService;
@ -478,7 +478,11 @@ class ShardConsumer {
}
ILeaseManager<KinesisClientLease> getLeaseManager() {
return leaseManager;
return leaseCoordinator.getLeaseManager();
}
KinesisClientLibLeaseCoordinator getLeaseCoordinator() {
return leaseCoordinator;
}
ICheckpoint getCheckpoint() {

View file

@ -1,8 +1,10 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -55,6 +57,12 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy {
return onFoundCompletedShard();
}
@Override
public TaskResult onShardConsumerShutDown(List<Shard> latestShards) {
shardSyncTaskManager.syncShardAndLeaseInfo(latestShards);
return new TaskResult(null);
}
@Override
public void onWorkerShutDown() {
LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString()));

View file

@ -1,5 +1,9 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.Shard;
import java.util.List;
/**
* Facade of methods that can be invoked at different points
* in KCL application execution to perform certain actions related to shard-sync.
@ -41,6 +45,16 @@ public interface ShardSyncStrategy {
*/
TaskResult onShardConsumerShutDown();
/**
* Invoked when ShardConsumer is shutdown and all shards are provided.
*
* @param latestShards latest snapshot of shards to reuse
* @return
*/
default TaskResult onShardConsumerShutDown(List<Shard> latestShards) {
return onShardConsumerShutDown();
}
/**
* Invoked when worker is shutdown.
*/

View file

@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -21,6 +22,8 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import java.util.List;
/**
* This task syncs leases/activies with shards of the stream.
* It will create new leases/activites when it discovers new shards (e.g. setup/resharding).
@ -39,6 +42,7 @@ class ShardSyncTask implements ITask {
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;
private final ShardSyncer shardSyncer;
private final List<Shard> latestShards;
/**
* @param kinesisProxy Used to fetch information about the stream (e.g. shard list)
@ -50,6 +54,7 @@ class ShardSyncTask implements ITask {
* in Kinesis)
* @param shardSyncTaskIdleTimeMillis shardSync task idle time in millis
* @param shardSyncer shardSyncer instance used to check and create new leases
* @param latestShards latest snapshot of shards to reuse
*/
ShardSyncTask(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
@ -57,7 +62,8 @@ class ShardSyncTask implements ITask {
boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards,
long shardSyncTaskIdleTimeMillis,
ShardSyncer shardSyncer) {
ShardSyncer shardSyncer, List<Shard> latestShards) {
this.latestShards = latestShards;
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.initialPosition = initialPositionInStream;
@ -79,7 +85,8 @@ class ShardSyncTask implements ITask {
leaseManager,
initialPosition,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards);
ignoreUnexpectedChildShards,
latestShards);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}

View file

@ -14,11 +14,13 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.model.Shard;
import lombok.Getter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -85,11 +87,11 @@ class ShardSyncTaskManager {
this.shardSyncer = shardSyncer;
}
synchronized Future<TaskResult> syncShardAndLeaseInfo(Set<String> closedShardIds) {
return checkAndSubmitNextTask(closedShardIds);
synchronized Future<TaskResult> syncShardAndLeaseInfo(List<Shard> latestShards) {
return checkAndSubmitNextTask(latestShards);
}
private synchronized Future<TaskResult> checkAndSubmitNextTask(Set<String> closedShardIds) {
private synchronized Future<TaskResult> checkAndSubmitNextTask(List<Shard> latestShards) {
Future<TaskResult> submittedTaskFuture = null;
if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) {
@ -111,7 +113,7 @@ class ShardSyncTaskManager {
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
shardSyncer), metricsFactory);
shardSyncer, latestShards), metricsFactory);
future = executorService.submit(currentTask);
if (LOG.isDebugEnabled()) {
LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");

View file

@ -21,12 +21,23 @@ import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.model.Shard;
import java.util.List;
public interface ShardSyncer {
void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException;
default void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards, List<Shard> latestShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}
}

View file

@ -14,6 +14,9 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -22,11 +25,14 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* Task for invoking the RecordProcessor shutdown() callback.
*/
@ -41,7 +47,7 @@ class ShutdownTask implements ITask {
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownReason reason;
private final IKinesisProxy kinesisProxy;
private final ILeaseManager<KinesisClientLease> leaseManager;
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards;
private final boolean ignoreUnexpectedChildShards;
@ -63,7 +69,7 @@ class ShutdownTask implements ITask {
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
ILeaseManager<KinesisClientLease> leaseManager,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long backoffTimeMillis,
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this.shardInfo = shardInfo;
@ -74,7 +80,7 @@ class ShutdownTask implements ITask {
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.leaseManager = leaseManager;
this.leaseCoordinator = leaseCoordinator;
this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsCache = getRecordsCache;
this.shardSyncer = shardSyncer;
@ -93,24 +99,44 @@ class ShutdownTask implements ITask {
boolean applicationException = false;
try {
ShutdownReason localReason = reason;
List<Shard> latestShards = null;
/*
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active
* workers to contend for the lease of this shard.
*/
if(localReason == ShutdownReason.TERMINATE) {
latestShards = kinesisProxy.getShardList();
//If latestShards is null or empty, we should still shut down the ShardConsumer with Zombie state which avoid
// checking point with SHARD_END sequence number.
if(CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) {
localReason = ShutdownReason.ZOMBIE;
dropLease();
LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId());
}
}
// If we reached end of the shard, set sequence number to SHARD_END.
if (reason == ShutdownReason.TERMINATE) {
if (localReason == ShutdownReason.TERMINATE) {
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
}
LOG.debug("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken "
+ shardInfo.getConcurrencyToken() + ". Shutdown reason: " + reason);
+ shardInfo.getConcurrencyToken() + ". Shutdown reason: " + localReason);
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(reason)
.withShutdownReason(localReason)
.withCheckpointer(recordProcessorCheckpointer);
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
recordProcessor.shutdown(shutdownInput);
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
if (reason == ShutdownReason.TERMINATE) {
if (localReason == ShutdownReason.TERMINATE) {
if ((lastCheckpointValue == null)
|| (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
@ -129,10 +155,10 @@ class ShutdownTask implements ITask {
MetricsLevel.SUMMARY);
}
if (reason == ShutdownReason.TERMINATE) {
if (localReason == ShutdownReason.TERMINATE) {
LOG.debug("Looking for child shards of shard " + shardInfo.getShardId());
// create leases for the child shards
TaskResult result = shardSyncStrategy.onShardConsumerShutDown();
TaskResult result = shardSyncStrategy.onShardConsumerShutDown(latestShards);
if (result.getException() != null) {
LOG.debug("Exception while trying to sync shards on the shutdown of shard: " + shardInfo
.getShardId());
@ -175,4 +201,23 @@ class ShutdownTask implements ITask {
return reason;
}
private boolean isShardInContextParentOfAny(List<Shard> shards) {
for(Shard shard : shards) {
if (isChildShardOfShardInContext(shard)) {
return true;
}
}
return false;
}
private boolean isChildShardOfShardInContext(Shard shard) {
return (StringUtils.equals(shard.getParentShardId(), shardInfo.getShardId())
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardInfo.getShardId()));
}
private void dropLease() {
KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
leaseCoordinator.dropLease(lease);
LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey());
}
}

View file

@ -679,7 +679,7 @@ public class Worker implements Runnable {
LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer);
config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer, null);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
@ -1042,7 +1042,7 @@ public class Worker implements Runnable {
streamConfig,
checkpointTracker,
recordProcessor,
leaseCoordinator.getLeaseManager(),
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesUponShardCompletion,
executorService,
@ -1167,7 +1167,7 @@ public class Worker implements Runnable {
new ShardSyncTask(kinesisProxy, leaseManager, config.getInitialPositionInStreamExtended(),
config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(), SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
shardSyncer), metricsFactory));
shardSyncer, null), metricsFactory));
}
private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) {

View file

@ -35,6 +35,7 @@ import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@ -66,6 +67,8 @@ public class ConsumerStatesTest {
private KinesisDataFetcher dataFetcher;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@InjectMocks
private KinesisClientLibLeaseCoordinator leaseCoordinator = new KinesisClientLibLeaseCoordinator(leaseManager, "testCoordinator", 1000, 1000);
@Mock
private ICheckpoint checkpoint;
@Mock
@ -93,6 +96,7 @@ public class ConsumerStatesTest {
when(consumer.getShardInfo()).thenReturn(shardInfo);
when(consumer.getDataFetcher()).thenReturn(dataFetcher);
when(consumer.getLeaseManager()).thenReturn(leaseManager);
when(consumer.getLeaseCoordinator()).thenReturn(leaseCoordinator);
when(consumer.getCheckpoint()).thenReturn(checkpoint);
when(consumer.getFuture()).thenReturn(future);
when(consumer.getShutdownNotification()).thenReturn(shutdownNotification);
@ -294,7 +298,7 @@ public class ConsumerStatesTest {
equalTo(recordProcessorCheckpointer)));
assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
assertThat(task, shutdownTask(IKinesisProxy.class, "kinesisProxy", equalTo(kinesisProxy)));
assertThat(task, shutdownTask(LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager)));
assertThat(task, shutdownTask(KinesisClientLibLeaseCoordinator.class, "leaseCoordinator", equalTo(leaseCoordinator)));
assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream",
equalTo(initialPositionInStream)));
assertThat(task,

View file

@ -58,6 +58,7 @@ import org.hamcrest.TypeSafeMatcher;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@ -117,6 +118,8 @@ public class ShardConsumerTest {
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator;
@Mock
private ICheckpoint checkpoint;
@Mock
private ShutdownNotification shutdownNotification;
@ -145,6 +148,7 @@ public class ShardConsumerTest {
when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
@ -157,7 +161,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
null,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -194,6 +198,7 @@ public class ShardConsumerTest {
when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class);
when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
@ -206,7 +211,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
null,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
spyExecutorService,
@ -250,7 +255,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
null,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -264,6 +269,7 @@ public class ShardConsumerTest {
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
@ -332,6 +338,7 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig =
@ -368,7 +375,7 @@ public class ShardConsumerTest {
checkpoint,
processor,
recordProcessorCheckpointer,
leaseManager,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -445,7 +452,7 @@ public class ShardConsumerTest {
@Override
public void shutdown(ShutdownInput input) {
ShutdownReason reason = input.getShutdownReason();
if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) {
if ((reason.equals(ShutdownReason.ZOMBIE) || reason.equals(ShutdownReason.TERMINATE)) && errorShutdownLatch.getCount() > 0) {
errorShutdownLatch.countDown();
throw new RuntimeException("test");
} else {
@ -456,7 +463,7 @@ public class ShardConsumerTest {
/**
* Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record
* processor's shutdown method with reason terminate will be retried.
* processor's shutdown method with reason zombie will be retried.
*/
@Test
public final void testConsumeShardWithTransientTerminateError() throws Exception {
@ -476,7 +483,7 @@ public class ShardConsumerTest {
final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
@ -496,6 +503,9 @@ public class ShardConsumerTest {
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
any(IMetricsFactory.class), anyInt()))
.thenReturn(getRecordsCache);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease());
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
@ -514,7 +524,7 @@ public class ShardConsumerTest {
checkpoint,
processor,
recordProcessorCheckpointer,
leaseManager,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -528,7 +538,150 @@ public class ShardConsumerTest {
shardSyncer,
shardSyncStrategy);
when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(null));
when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
verify(getRecordsCache).start();
// We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) {
boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords;
}
Thread.sleep(50L);
}
// Consume shards until shutdown terminate is called and it has thrown an exception
for (int i = 0; i < 100; i++) {
consumer.consumeShard();
if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) {
break;
}
}
assertEquals(0, processor.errorShutdownLatch.getCount());
// Wait for a retry of shutdown terminate that should succeed
for (int i = 0; i < 100; i++) {
consumer.consumeShard();
if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) {
break;
}
}
assertEquals(0, processor.getShutdownLatch().getCount());
// Wait for shutdown complete now that terminate shutdown is successful
for (int i = 0; i < 100; i++) {
consumer.consumeShard();
if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) {
break;
}
Thread.sleep(50L);
}
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
verify(getRecordsCache).shutdown();
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString());
List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
file.delete();
}
/**
* Test method for {@link ShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown
* reason TERMINATE when the shard end is reached.
*/
@Test
public final void testConsumeShardWithShardEnd() throws Exception {
int numRecs = 10;
BigInteger startSeqNum = BigInteger.ONE;
String streamShardId = "kinesis-0-0";
String testConcurrencyToken = "testToken";
List<Shard> shardList = KinesisLocalFileDataCreator.createShardList(3, "kinesis-0-", startSeqNum);
// Close the shard so that shutdown is called with reason terminate
shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber(
KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString());
shardList.get(1).setParentShardId("kinesis-0-0");
shardList.get(2).setAdjacentParentShardId("kinesis-0-0");
File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002");
IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
final int maxRecords = 2;
final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
StreamConfig streamConfig =
new StreamConfig(fileBasedProxy,
maxRecords,
idleTimeMS,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null);
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
any(IMetricsFactory.class), anyInt()))
.thenReturn(getRecordsCache);
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
),
metricsFactory
);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
recordProcessorCheckpointer,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
dataFetcher,
Optional.empty(),
Optional.empty(),
config,
shardSyncer,
shardSyncStrategy);
when(shardSyncStrategy.onShardConsumerShutDown(shardList)).thenReturn(new TaskResult(null));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -618,6 +771,7 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig =
@ -655,7 +809,7 @@ public class ShardConsumerTest {
checkpoint,
processor,
recordProcessorCheckpointer,
leaseManager,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -730,7 +884,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
null,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -746,6 +900,7 @@ public class ShardConsumerTest {
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory());
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
@ -783,7 +938,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
null,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -815,7 +970,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
null,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
@ -858,7 +1013,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
null,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
mockExecutorService,

View file

@ -127,7 +127,8 @@ public class ShardSyncTaskIntegrationTest {
false,
false,
0L,
shardSyncer);
shardSyncer,
null);
syncTask.call();
List<KinesisClientLease> leases = leaseManager.listLeases();
Set<String> leaseKeys = new HashSet<String>();

View file

@ -25,6 +25,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,7 +37,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ExceptionThrowingLeaseManager.ExceptionThrowingLeaseManagerMethods;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
@ -230,7 +232,7 @@ public class ShardSyncerTest {
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false);
cleanupLeasesOfCompletedShards, false, shards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add("shardId-4");
@ -262,7 +264,7 @@ public class ShardSyncerTest {
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, false);
cleanupLeasesOfCompletedShards, false, shards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
for (int i = 0; i < 11; i++) {
@ -293,7 +295,7 @@ public class ShardSyncerTest {
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP,
cleanupLeasesOfCompletedShards, false);
cleanupLeasesOfCompletedShards, false, shards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
for (int i = 0; i < 11; i++) {
@ -327,7 +329,7 @@ public class ShardSyncerTest {
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, false);
cleanupLeasesOfCompletedShards, false, shards);
dataFile.delete();
}
@ -352,7 +354,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true);
cleanupLeasesOfCompletedShards, true, shards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add("shardId-4");
@ -467,7 +469,7 @@ public class ShardSyncerTest {
exceptionThrowingLeaseManager,
position,
cleanupLeasesOfCompletedShards,
false);
false, null);
return;
} catch (LeasingException e) {
LOG.debug("Catch leasing exception", e);
@ -480,7 +482,7 @@ public class ShardSyncerTest {
leaseManager,
position,
cleanupLeasesOfCompletedShards,
false);
false, null);
}
}

View file

@ -14,16 +14,22 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -53,7 +59,7 @@ public class ShutdownTaskTest {
Set<String> defaultParentShardIds = new HashSet<>();
String defaultConcurrencyToken = "testToken4398";
String defaultShardId = "shardId-0000397840";
String defaultShardId = "shardId-0";
ShardInfo defaultShardInfo = new ShardInfo(defaultShardId,
defaultConcurrencyToken,
defaultParentShardIds,
@ -104,7 +110,11 @@ public class ShutdownTaskTest {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
List<Shard> shards = constructShardListForGraphA();
when(kinesisProxy.getShardList()).thenReturn(shards);
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
ShutdownTask task = new ShutdownTask(defaultShardInfo,
@ -115,7 +125,7 @@ public class ShutdownTaskTest {
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseManager,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
@ -132,12 +142,17 @@ public class ShutdownTaskTest {
public final void testCallWhenSyncingShardsThrows() {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(null);
when(kinesisProxy.getShardList()).thenReturn(shards);
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(new KinesisClientLibIOException("")));
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(new KinesisClientLibIOException("")));
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
@ -146,25 +161,187 @@ public class ShutdownTaskTest {
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseManager,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
TaskResult result = task.call();
verify(shardSyncStrategy).onShardConsumerShutDown();
verify(shardSyncStrategy).onShardConsumerShutDown(shards);
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(getRecordsCache).shutdown();
}
@Test
public final void testCallWhenShardEnd() {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards);
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
TaskResult result = task.call();
verify(shardSyncStrategy).onShardConsumerShutDown(shards);
verify(kinesisProxy, times(1)).getShardList();
Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown();
verify(leaseCoordinator, never()).dropLease(any());
}
@Test
public final void testCallWhenFalseShardEnd() {
ShardInfo shardInfo = new ShardInfo("shardId-4",
defaultConcurrencyToken,
defaultParentShardIds,
ExtendedSequenceNumber.LATEST);
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards);
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease());
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
ShutdownTask task = new ShutdownTask(shardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
TaskResult result = task.call();
verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
verify(kinesisProxy, times(1)).getShardList();
Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown();
verify(leaseCoordinator).dropLease(any());
}
@Test
public final void testCallWhenLeaseLost() {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards);
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
when(shardSyncStrategy.onShardConsumerShutDown(shards)).thenReturn(new TaskResult(null));
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.ZOMBIE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy);
TaskResult result = task.call();
verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
verify(kinesisProxy, never()).getShardList();
Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown();
verify(leaseCoordinator, never()).dropLease(any());
}
/**
* Test method for {@link ShutdownTask#getTaskType()}.
*/
@Test
public final void testGetTaskType() {
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, shardSyncer, shardSyncStrategy);
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, getRecordsCache, shardSyncer, shardSyncStrategy);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
}
/*
* Helper method to construct a shard list for graph A. Graph A is defined below.
* Shard structure (y-axis is epochs):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*/
private List<Shard> constructShardListForGraphA() {
List<Shard> shards = new ArrayList<Shard>();
SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102");
SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null);
SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("11", "210");
SequenceNumberRange range3 = ShardObjectHelper.newSequenceNumberRange("103", "210");
SequenceNumberRange range4 = ShardObjectHelper.newSequenceNumberRange("211", null);
HashKeyRange hashRange0 = ShardObjectHelper.newHashKeyRange("0", "99");
HashKeyRange hashRange1 = ShardObjectHelper.newHashKeyRange("100", "199");
HashKeyRange hashRange2 = ShardObjectHelper.newHashKeyRange("200", "299");
HashKeyRange hashRange3 = ShardObjectHelper.newHashKeyRange("300", "399");
HashKeyRange hashRange4 = ShardObjectHelper.newHashKeyRange("400", "499");
HashKeyRange hashRange5 = ShardObjectHelper.newHashKeyRange("500", ShardObjectHelper.MAX_HASH_KEY);
HashKeyRange hashRange6 = ShardObjectHelper.newHashKeyRange("0", "199");
HashKeyRange hashRange7 = ShardObjectHelper.newHashKeyRange("200", "399");
HashKeyRange hashRange8 = ShardObjectHelper.newHashKeyRange("0", "399");
HashKeyRange hashRange9 = ShardObjectHelper.newHashKeyRange("500", "799");
HashKeyRange hashRange10 = ShardObjectHelper.newHashKeyRange("800", ShardObjectHelper.MAX_HASH_KEY);
shards.add(ShardObjectHelper.newShard("shardId-0", null, null, range0, hashRange0));
shards.add(ShardObjectHelper.newShard("shardId-1", null, null, range0, hashRange1));
shards.add(ShardObjectHelper.newShard("shardId-2", null, null, range0, hashRange2));
shards.add(ShardObjectHelper.newShard("shardId-3", null, null, range0, hashRange3));
shards.add(ShardObjectHelper.newShard("shardId-4", null, null, range1, hashRange4));
shards.add(ShardObjectHelper.newShard("shardId-5", null, null, range2, hashRange5));
shards.add(ShardObjectHelper.newShard("shardId-6", "shardId-0", "shardId-1", range3, hashRange6));
shards.add(ShardObjectHelper.newShard("shardId-7", "shardId-2", "shardId-3", range3, hashRange7));
shards.add(ShardObjectHelper.newShard("shardId-8", "shardId-6", "shardId-7", range4, hashRange8));
shards.add(ShardObjectHelper.newShard("shardId-9", "shardId-5", null, range4, hashRange9));
shards.add(ShardObjectHelper.newShard("shardId-10", null, "shardId-5", range4, hashRange10));
return shards;
}
}