Introducing HierarchicalShardSyncer inorder to run multiple Schedulers in a JVM (#395)

* Run multiple instance of scheduler on one JVM

* handling creation of shardSyncer in DynamoDBLeaseManagementFactory and LeaseManagementConfig

* remove multi-threading unit test and do some small refactorings

* refectoring

* deprecate ShardSyncer and use HierarchichalShardSyncer instead; change the order for metricsFactory and HierarchichalShardSyncer in ShardConsumerArgument

* fix typos and use mock object of shardSyncer

* delete improper comments

* fix comments

* remove duplicated comments
This commit is contained in:
xiaoyu meng 2018-10-09 17:29:59 -07:00 committed by Sahil Palvia
parent 854e316b83
commit 14c68296f0
14 changed files with 237 additions and 65 deletions

View file

@ -47,6 +47,7 @@ import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.LifecycleConfig;
@ -113,6 +114,7 @@ public class Scheduler implements Runnable {
private final ShardDetector shardDetector; private final ShardDetector shardDetector;
private final boolean ignoreUnexpetedChildShards; private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil; private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
// Holds consumers for shards the worker is currently tracking. Key is shard // Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer. // info, value is ShardConsumer.
@ -195,6 +197,7 @@ public class Scheduler implements Runnable {
this.shardDetector = this.shardSyncTaskManager.shardDetector(); this.shardDetector = this.shardSyncTaskManager.shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
} }
/** /**
@ -239,7 +242,8 @@ public class Scheduler implements Runnable {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
log.info("Syncing Kinesis shard info"); log.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory); cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else { } else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
@ -575,8 +579,9 @@ public class Scheduler implements Runnable {
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
ignoreUnexpetedChildShards, ignoreUnexpetedChildShards,
shardDetector, shardDetector,
metricsFactory, aggregatorUtil,
aggregatorUtil); hierarchicalShardSyncer,
metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument); return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument);
} }

View file

@ -30,13 +30,12 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
@ -54,24 +53,27 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it * It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards. * and begun processing it's child shards.
*/ */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j @Slf4j
public class ShardSyncer { @KinesisClientInternalApi
public class HierarchicalShardSyncer {
/** /**
* Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards * Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards
* (e.g. at startup, or when we reach end of a shard). * (e.g. at startup, or when we reach end of a shard).
* *
* @param shardDetector
* @param leaseRefresher * @param leaseRefresher
* @param initialPosition * @param initialPosition
* @param cleanupLeasesOfCompletedShards * @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards * @param ignoreUnexpectedChildShards
* @param scope
* @throws DependencyException * @throws DependencyException
* @throws InvalidStateException * @throws InvalidStateException
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException * @throws KinesisClientLibIOException
*/ */
// CHECKSTYLE:OFF CyclomaticComplexity // CHECKSTYLE:OFF CyclomaticComplexity
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException, final MetricsScope scope) throws DependencyException, InvalidStateException,
@ -152,7 +154,7 @@ public class ShardSyncer {
* @return ShardIds of child shards (children of the expectedClosedShard) * @return ShardIds of child shards (children of the expectedClosedShard)
* @throws KinesisClientLibIOException * @throws KinesisClientLibIOException
*/ */
static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap, synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
final Map<String, Set<String>> shardIdToChildShardIdsMap, final Set<String> shardIdsOfClosedShards) final Map<String, Set<String>> shardIdToChildShardIdsMap, final Set<String> shardIdsOfClosedShards)
throws KinesisClientLibIOException { throws KinesisClientLibIOException {
final String exceptionMessageSuffix = "This can happen if we constructed the list of shards " final String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
@ -181,7 +183,7 @@ public class ShardSyncer {
} }
} }
private static synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard, private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds) final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds)
throws KinesisClientLibIOException { throws KinesisClientLibIOException {
BigInteger minStartingHashKeyOfChildren = null; BigInteger minStartingHashKeyOfChildren = null;
@ -583,7 +585,7 @@ public class ShardSyncer {
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException * @throws KinesisClientLibIOException
*/ */
private static synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases, private synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap, final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException, final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException,
InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
@ -625,7 +627,7 @@ public class ShardSyncer {
* @throws InvalidStateException * @throws InvalidStateException
* @throws DependencyException * @throws DependencyException
*/ */
static synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds, synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher) final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease leaseForClosedShard = trackedLeases.get(closedShardId); final Lease leaseForClosedShard = trackedLeases.get(closedShardId);

View file

@ -231,6 +231,8 @@ public class LeaseManagementConfig {
*/ */
private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK; private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;
private HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer();
private LeaseManagementFactory leaseManagementFactory; private LeaseManagementFactory leaseManagementFactory;
public LeaseManagementFactory leaseManagementFactory() { public LeaseManagementFactory leaseManagementFactory() {
@ -258,6 +260,7 @@ public class LeaseManagementConfig {
cacheMissWarningModulus(), cacheMissWarningModulus(),
initialLeaseTableReadCapacity(), initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(), initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback()); tableCreatorCallback());
} }
return leaseManagementFactory; return leaseManagementFactory;

View file

@ -48,6 +48,8 @@ public class ShardSyncTask implements ConsumerTask {
private final boolean ignoreUnexpectedChildShards; private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis; private final long shardSyncTaskIdleTimeMillis;
@NonNull @NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final TaskType taskType = TaskType.SHARDSYNC; private final TaskType taskType = TaskType.SHARDSYNC;
@ -62,7 +64,7 @@ public class ShardSyncTask implements ConsumerTask {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);
try { try {
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPosition, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope); cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
if (shardSyncTaskIdleTimeMillis > 0) { if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis); Thread.sleep(shardSyncTaskIdleTimeMillis);

View file

@ -50,8 +50,68 @@ public class ShardSyncTaskManager {
@NonNull @NonNull
private final ExecutorService executorService; private final ExecutorService executorService;
@NonNull @NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
/**
* Constructor.
*
* <p>NOTE: This constructor is deprecated and will be removed in a future release.</p>
*
* @param shardDetector
* @param leaseRefresher
* @param initialPositionInStream
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIdleTimeMillis
* @param executorService
* @param metricsFactory
*/
@Deprecated
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService,
MetricsFactory metricsFactory) {
this.shardDetector = shardDetector;
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.hierarchicalShardSyncer = new HierarchicalShardSyncer();
this.metricsFactory = metricsFactory;
}
/**
* Constructor.
*
* @param shardDetector
* @param leaseRefresher
* @param initialPositionInStream
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIdleTimeMillis
* @param executorService
* @param hierarchicalShardSyncer
* @param metricsFactory
*/
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService,
HierarchicalShardSyncer hierarchicalShardSyncer, MetricsFactory metricsFactory) {
this.shardDetector = shardDetector;
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.metricsFactory = metricsFactory;
}
private ConsumerTask currentTask; private ConsumerTask currentTask;
private Future<TaskResult> future; private Future<TaskResult> future;
@ -82,6 +142,7 @@ public class ShardSyncTaskManager {
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis, shardSyncIdleTimeMillis,
hierarchicalShardSyncer,
metricsFactory), metricsFactory),
metricsFactory); metricsFactory);
future = executorService.submit(currentTask); future = executorService.submit(currentTask);

View file

@ -23,6 +23,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementFactory; import software.amazon.kinesis.leases.LeaseManagementFactory;
@ -50,6 +51,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final ExecutorService executorService; private final ExecutorService executorService;
@NonNull @NonNull
private final InitialPositionInStreamExtended initialPositionInStream; private final InitialPositionInStreamExtended initialPositionInStream;
@NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final long failoverTimeMillis; private final long failoverTimeMillis;
private final long epsilonMillis; private final long epsilonMillis;
private final int maxLeasesForWorker; private final int maxLeasesForWorker;
@ -162,7 +166,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
} }
/** /**
@ -191,6 +195,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus * @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity * @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity * @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback * @param tableCreatorCallback
*/ */
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
@ -203,6 +208,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer,
final TableCreatorCallback tableCreatorCallback) { final TableCreatorCallback tableCreatorCallback) {
this.kinesisClient = kinesisClient; this.kinesisClient = kinesisClient;
this.streamName = streamName; this.streamName = streamName;
@ -227,6 +233,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.cacheMissWarningModulus = cacheMissWarningModulus; this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.tableCreatorCallback = tableCreatorCallback; this.tableCreatorCallback = tableCreatorCallback;
} }
@ -253,6 +260,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, ignoreUnexpectedChildShards,
shardSyncIntervalMillis, shardSyncIntervalMillis,
executorService, executorService,
hierarchicalShardSyncer,
metricsFactory); metricsFactory);
} }

View file

@ -0,0 +1,46 @@
package software.amazon.kinesis.leases.exceptions;
import lombok.NonNull;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.metrics.MetricsScope;
/**
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards.
*
* <p>NOTE: This class is deprecated and will be removed in a future release.</p>
*/
@Deprecated
public class ShardSyncer {
private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer();
/**
* <p>NOTE: This method is deprecated and will be removed in a future release.</p>
*
* @param shardDetector
* @param leaseRefresher
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param scope
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
@Deprecated
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
}
}

View file

@ -495,6 +495,7 @@ class ConsumerStates {
argument.leaseRefresher(), argument.leaseRefresher(),
argument.taskBackoffTimeMillis(), argument.taskBackoffTimeMillis(),
argument.recordsPublisher(), argument.recordsPublisher(),
argument.hierarchicalShardSyncer(),
argument.metricsFactory()); argument.metricsFactory());
} }

View file

@ -24,6 +24,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor;
@ -65,7 +66,8 @@ public class ShardConsumerArgument {
private final boolean ignoreUnexpectedChildShards; private final boolean ignoreUnexpectedChildShards;
@NonNull @NonNull
private final ShardDetector shardDetector; private final ShardDetector shardDetector;
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull @NonNull
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil;
} }

View file

@ -25,7 +25,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncer; import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
@ -66,6 +66,8 @@ public class ShutdownTask implements ConsumerTask {
@NonNull @NonNull
private final RecordsPublisher recordsPublisher; private final RecordsPublisher recordsPublisher;
@NonNull @NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final TaskType taskType = TaskType.SHUTDOWN; private final TaskType taskType = TaskType.SHUTDOWN;
@ -123,8 +125,8 @@ public class ShutdownTask implements ConsumerTask {
if (reason == ShutdownReason.SHARD_END) { if (reason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId()); log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards // create leases for the child shards
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPositionInStream, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
} }

View file

@ -43,6 +43,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -63,7 +64,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
// CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES // CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES
public class ShardSyncerTest { public class HierarchicalShardSyncerTest {
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.LATEST); .newInitialPosition(InitialPositionInStream.LATEST);
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended
@ -76,6 +77,9 @@ public class ShardSyncerTest {
private final boolean cleanupLeasesOfCompletedShards = true; private final boolean cleanupLeasesOfCompletedShards = true;
private final boolean ignoreUnexpectedChildShards = false; private final boolean ignoreUnexpectedChildShards = false;
private HierarchicalShardSyncer hierarchicalShardSyncer;
/** /**
* Old/Obsolete max value of a sequence number (2^128 -1). * Old/Obsolete max value of a sequence number (2^128 -1).
*/ */
@ -86,6 +90,11 @@ public class ShardSyncerTest {
@Mock @Mock
private DynamoDBLeaseRefresher dynamoDBLeaseRefresher; private DynamoDBLeaseRefresher dynamoDBLeaseRefresher;
@Before
public void setup() {
hierarchicalShardSyncer = new HierarchicalShardSyncer();
}
/** /**
* Test determineNewLeasesToCreate() where there are no shards * Test determineNewLeasesToCreate() where there are no shards
*/ */
@ -94,7 +103,7 @@ public class ShardSyncerTest {
final List<Shard> shards = Collections.emptyList(); final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList(); final List<Lease> leases = Collections.emptyList();
assertThat(ShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(), assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(),
equalTo(true)); equalTo(true));
} }
@ -111,7 +120,7 @@ public class ShardSyncerTest {
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList(); final List<Lease> currentLeases = Collections.emptyList();
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST); INITIAL_POSITION_LATEST);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@ -138,7 +147,7 @@ public class ShardSyncerTest {
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds); INITIAL_POSITION_LATEST, inconsistentShardIds);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@ -151,7 +160,7 @@ public class ShardSyncerTest {
*/ */
@Test @Test
public void testBootstrapShardLeasesAtTrimHorizon() throws Exception { public void testBootstrapShardLeasesAtTrimHorizon() throws Exception {
testCheckAndCreateLeasesForNewShards(INITIAL_POSITION_TRIM_HORIZON); testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_TRIM_HORIZON);
} }
/** /**
@ -159,11 +168,11 @@ public class ShardSyncerTest {
*/ */
@Test @Test
public void testBootstrapShardLeasesAtLatest() throws Exception { public void testBootstrapShardLeasesAtLatest() throws Exception {
testCheckAndCreateLeasesForNewShards(INITIAL_POSITION_LATEST); testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_LATEST);
} }
@Test @Test
public void testCheckAndCreateLeasesForNewShardsAtLatest() throws Exception { public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exception {
final List<Shard> shards = constructShardListForGraphA(); final List<Shard> shards = constructShardListForGraphA();
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class); final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
@ -172,7 +181,8 @@ public class ShardSyncerTest {
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE); cleanupLeasesOfCompletedShards, false, SCOPE);
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
@ -197,12 +207,12 @@ public class ShardSyncerTest {
@Test @Test
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception { public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception {
testCheckAndCreateLeaseForNewShards(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON); testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON);
} }
@Test @Test
public void testCheckAndCreateLeasesForNewShardsAtTimestamp() throws Exception { public void testCheckAndCreateLeasesForNewShardsAtTimestamp() throws Exception {
testCheckAndCreateLeaseForNewShards(constructShardListForGraphA(), INITIAL_POSITION_AT_TIMESTAMP); testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_AT_TIMESTAMP);
} }
@Test(expected = KinesisClientLibIOException.class) @Test(expected = KinesisClientLibIOException.class)
@ -217,7 +227,7 @@ public class ShardSyncerTest {
when(shardDetector.listShards()).thenReturn(shards); when(shardDetector.listShards()).thenReturn(shards);
try { try {
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE); INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE);
} finally { } finally {
verify(shardDetector).listShards(); verify(shardDetector).listShards();
@ -251,7 +261,8 @@ public class ShardSyncerTest {
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true, SCOPE); cleanupLeasesOfCompletedShards, true, SCOPE);
final List<Lease> leases = leaseCaptor.getAllValues(); final List<Lease> leases = leaseCaptor.getAllValues();
@ -307,7 +318,7 @@ public class ShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture()); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
// Initial call: No leases present, create leases. // Initial call: No leases present, create leases.
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -322,7 +333,7 @@ public class ShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
// Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup. // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup.
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -382,7 +393,7 @@ public class ShardSyncerTest {
.when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture()); .when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
// Initial call: Call to create leases. // Initial call: Call to create leases.
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -398,7 +409,8 @@ public class ShardSyncerTest {
try { try {
// Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails. // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails.
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
} finally { } finally {
List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
@ -421,7 +433,8 @@ public class ShardSyncerTest {
verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class));
// Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes. // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes.
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
deleteLeases = leaseDeleteCaptor.getAllValues(); deleteLeases = leaseDeleteCaptor.getAllValues();
@ -481,7 +494,8 @@ public class ShardSyncerTest {
try { try {
// Initial call: Call to create leases. Fails on ListLeases // Initial call: Call to create leases. Fails on ListLeases
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
} finally { } finally {
verify(shardDetector, times(1)).listShards(); verify(shardDetector, times(1)).listShards();
@ -490,7 +504,8 @@ public class ShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
// Second call: Leases not present, leases will be created. // Second call: Leases not present, leases will be created.
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -504,7 +519,8 @@ public class ShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
// Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up. // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up.
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
@ -569,7 +585,8 @@ public class ShardSyncerTest {
try { try {
// Initial call: No leases present, create leases. Create lease Fails // Initial call: No leases present, create leases. Create lease Fails
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
} finally { } finally {
verify(shardDetector, times(1)).listShards(); verify(shardDetector, times(1)).listShards();
@ -577,7 +594,8 @@ public class ShardSyncerTest {
verify(dynamoDBLeaseRefresher, times(1)).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(1)).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -591,7 +609,8 @@ public class ShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
// Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up. // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up.
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
@ -653,7 +672,7 @@ public class ShardSyncerTest {
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
@ -665,7 +684,7 @@ public class ShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
} }
private void testCheckAndCreateLeasesForNewShards(InitialPositionInStreamExtended initialPosition) private void testCheckAndCreateLeasesForShardsIfMissing(InitialPositionInStreamExtended initialPosition)
throws Exception { throws Exception {
final String shardId0 = "shardId-0"; final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1"; final String shardId1 = "shardId-1";
@ -673,10 +692,10 @@ public class ShardSyncerTest {
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
testCheckAndCreateLeaseForNewShards(shards, initialPosition); testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition);
} }
private void testCheckAndCreateLeaseForNewShards(final List<Shard> shards, private void testCheckAndCreateLeaseForShardsIfMissing(final List<Shard> shards,
final InitialPositionInStreamExtended initialPosition) throws Exception { final InitialPositionInStreamExtended initialPosition) throws Exception {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class); final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
@ -684,7 +703,8 @@ public class ShardSyncerTest {
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
cleanupLeasesOfCompletedShards, false, SCOPE); cleanupLeasesOfCompletedShards, false, SCOPE);
final List<Lease> leases = leaseCaptor.getAllValues(); final List<Lease> leases = leaseCaptor.getAllValues();
@ -720,7 +740,7 @@ public class ShardSyncerTest {
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
for (InitialPositionInStreamExtended initialPosition : initialPositions) { for (InitialPositionInStreamExtended initialPosition : initialPositions) {
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
initialPosition); initialPosition);
assertThat(newLeases.size(), equalTo(2)); assertThat(newLeases.size(), equalTo(2));
@ -743,7 +763,7 @@ public class ShardSyncerTest {
ShardObjectHelper.newShard(lastShardId, null, null, ShardObjectHelper.newShard(lastShardId, null, null,
ShardObjectHelper.newSequenceNumberRange("405", null))); ShardObjectHelper.newSequenceNumberRange("405", null)));
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST); INITIAL_POSITION_LATEST);
assertThat(newLeases.size(), equalTo(1)); assertThat(newLeases.size(), equalTo(1));
@ -766,7 +786,7 @@ public class ShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5")); newLease("shardId-5"));
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST); INITIAL_POSITION_LATEST);
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>(); final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
@ -801,7 +821,7 @@ public class ShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7")); newLease("shardId-7"));
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST); INITIAL_POSITION_LATEST);
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>(); final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
@ -834,7 +854,7 @@ public class ShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5")); newLease("shardId-5"));
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_TRIM_HORIZON); INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -869,7 +889,7 @@ public class ShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7")); newLease("shardId-7"));
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_TRIM_HORIZON); INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -899,7 +919,7 @@ public class ShardSyncerTest {
final List<Shard> shards = constructShardListForGraphB(); final List<Shard> shards = constructShardListForGraphB();
final List<Lease> currentLeases = new ArrayList<>(); final List<Lease> currentLeases = new ArrayList<>();
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_TRIM_HORIZON); INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -934,7 +954,7 @@ public class ShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5")); newLease("shardId-5"));
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_AT_TIMESTAMP); INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint) final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -968,7 +988,7 @@ public class ShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7")); newLease("shardId-7"));
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_AT_TIMESTAMP); INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint) final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -995,7 +1015,7 @@ public class ShardSyncerTest {
final List<Shard> shards = constructShardListForGraphB(); final List<Shard> shards = constructShardListForGraphB();
final List<Lease> currentLeases = new ArrayList<>(); final List<Lease> currentLeases = new ArrayList<>();
final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_AT_TIMESTAMP); INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint) final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1092,7 +1112,8 @@ public class ShardSyncerTest {
public void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() { public void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() {
final Map<String, Boolean> memoizationContext = new HashMap<>(); final Map<String, Boolean> memoizationContext = new HashMap<>();
assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null, assertThat(HierarchicalShardSyncer
.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null,
null, memoizationContext), equalTo(false)); null, memoizationContext), equalTo(false));
} }
@ -1104,7 +1125,8 @@ public class ShardSyncerTest {
final String shardId = "shardId-trimmed"; final String shardId = "shardId-trimmed";
final Map<String, Boolean> memoizationContext = new HashMap<>(); final Map<String, Boolean> memoizationContext = new HashMap<>();
assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null, assertThat(HierarchicalShardSyncer
.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null,
new HashMap<>(), null, memoizationContext), equalTo(false)); new HashMap<>(), null, memoizationContext), equalTo(false));
} }
@ -1120,7 +1142,8 @@ public class ShardSyncerTest {
final Map<String, Shard> kinesisShards = new HashMap<>(); final Map<String, Shard> kinesisShards = new HashMap<>();
kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, null, null, null)); kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, null, null, null));
assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, assertThat(
HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(true)); shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(true));
assertThat(newLeaseMap.isEmpty(), equalTo(true)); assertThat(newLeaseMap.isEmpty(), equalTo(true));
} }
@ -1142,7 +1165,8 @@ public class ShardSyncerTest {
kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null)); kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null));
kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null)); kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null));
assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, assertThat(
HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(false)); shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(false));
assertThat(newLeaseMap.isEmpty(), equalTo(true)); assertThat(newLeaseMap.isEmpty(), equalTo(true));
} }

View file

@ -63,6 +63,7 @@ public class ShardSyncTaskIntegrationTest {
private LeaseRefresher leaseRefresher; private LeaseRefresher leaseRefresher;
private ShardDetector shardDetector; private ShardDetector shardDetector;
private HierarchicalShardSyncer hierarchicalShardSyncer;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
@ -97,6 +98,7 @@ public class ShardSyncTaskIntegrationTest {
shardDetector = new KinesisShardDetector(kinesisClient, STREAM_NAME, 500L, 50, shardDetector = new KinesisShardDetector(kinesisClient, STREAM_NAME, 500L, 50,
LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS); LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS);
hierarchicalShardSyncer = new HierarchicalShardSyncer();
} }
/** /**
@ -117,7 +119,7 @@ public class ShardSyncTaskIntegrationTest {
Set<String> shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet()); Set<String> shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet());
ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher, ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, 0L, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, 0L,
NULL_METRICS_FACTORY); hierarchicalShardSyncer, NULL_METRICS_FACTORY);
syncTask.call(); syncTask.call();
List<Lease> leases = leaseRefresher.listLeases(); List<Lease> leases = leaseRefresher.listLeases();
Set<String> leaseKeys = new HashSet<>(); Set<String> leaseKeys = new HashSet<>();

View file

@ -46,6 +46,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
@ -86,6 +87,8 @@ public class ConsumerStatesTest {
@Mock @Mock
private ShardDetector shardDetector; private ShardDetector shardDetector;
@Mock @Mock
private HierarchicalShardSyncer hierarchicalShardSyncer;
@Mock
private MetricsFactory metricsFactory; private MetricsFactory metricsFactory;
@Mock @Mock
private ProcessRecordsInput processRecordsInput; private ProcessRecordsInput processRecordsInput;
@ -109,7 +112,8 @@ public class ConsumerStatesTest {
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, metricsFactory, new AggregatorUtil()); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(),
hierarchicalShardSyncer, metricsFactory);
consumer = spy( consumer = spy(
new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument)); new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument));

View file

@ -17,6 +17,7 @@ package software.amazon.kinesis.lifecycle;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -33,6 +34,7 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -73,6 +75,8 @@ public class ShutdownTaskTest {
private LeaseRefresher leaseRefresher; private LeaseRefresher leaseRefresher;
@Mock @Mock
private ShardDetector shardDetector; private ShardDetector shardDetector;
@Mock
private HierarchicalShardSyncer hierarchicalShardSyncer;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -86,7 +90,7 @@ public class ShutdownTaskTest {
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
TERMINATE_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, TERMINATE_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
NULL_METRICS_FACTORY); hierarchicalShardSyncer, NULL_METRICS_FACTORY);
} }
/** /**
@ -104,9 +108,15 @@ public class ShutdownTaskTest {
* Test method for {@link ShutdownTask#call()}. * Test method for {@link ShutdownTask#call()}.
*/ */
@Test @Test
public final void testCallWhenSyncingShardsThrows() { public final void testCallWhenSyncingShardsThrows() throws Exception {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(shardDetector.listShards()).thenReturn(null); when(shardDetector.listShards()).thenReturn(null);
doAnswer((invocation) -> {
throw new KinesisClientLibIOException("KinesisClientLibIOException");
}).when(hierarchicalShardSyncer)
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
NULL_METRICS_FACTORY.createMetrics());
TaskResult result = task.call(); TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());