Make periodic shard sync support multi streams

This commit is contained in:
Chunxue Yang 2020-03-18 11:56:20 -07:00
parent 6c4297f5b3
commit f57a332671
4 changed files with 109 additions and 81 deletions

View file

@ -18,16 +18,18 @@ import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* The top level orchestrator for coordinating the periodic shard sync related * The top level orchestrator for coordinating the periodic shard sync related
@ -42,21 +44,21 @@ class PeriodicShardSyncManager {
private final String workerId; private final String workerId;
private final LeaderDecider leaderDecider; private final LeaderDecider leaderDecider;
private final ConsumerTask metricsEmittingShardSyncTask; private final Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap;
private final ScheduledExecutorService shardSyncThreadPool; private final ScheduledExecutorService shardSyncThreadPool;
private boolean isRunning; private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, MetricsFactory metricsFactory) { PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap) {
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory); this(workerId, leaderDecider, streamToShardSyncTaskManagerMap, Executors.newSingleThreadScheduledExecutor());
} }
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, MetricsFactory metricsFactory) { PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap,
ScheduledExecutorService shardSyncThreadPool) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId; this.workerId = workerId;
this.leaderDecider = leaderDecider; this.leaderDecider = leaderDecider;
this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); this.streamToShardSyncTaskManagerMap = streamToShardSyncTaskManagerMap;
this.shardSyncThreadPool = shardSyncThreadPool; this.shardSyncThreadPool = shardSyncThreadPool;
} }
@ -82,17 +84,14 @@ class PeriodicShardSyncManager {
* Does not schedule periodic shardSync * Does not schedule periodic shardSync
* @return the result of the task * @return the result of the task
*/ */
public synchronized TaskResult syncShardsOnce() { public synchronized void syncShardsOnce() throws Exception {
for (Map.Entry<StreamIdentifier, ShardSyncTaskManager> mapEntry : streamToShardSyncTaskManagerMap.entrySet()) {
Exception lastException = null; final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue();
try { final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask();
if (!isRunning) { if (taskResult.getException() != null) {
runShardSync(); throw taskResult.getException();
} }
} catch (Exception e) {
lastException = e;
} }
return new TaskResult(lastException);
} }
public void stop() { public void stop() {
@ -107,10 +106,11 @@ class PeriodicShardSyncManager {
private void runShardSync() { private void runShardSync() {
if (leaderDecider.isLeader(workerId)) { if (leaderDecider.isLeader(workerId)) {
log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); for (Map.Entry<StreamIdentifier, ShardSyncTaskManager> mapEntry : streamToShardSyncTaskManagerMap.entrySet()) {
final TaskResult taskResult = metricsEmittingShardSyncTask.call(); final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue();
if (taskResult != null && taskResult.getException() != null) { if (!shardSyncTaskManager.syncShardAndLeaseInfo()) {
throw new KinesisClientLibIOException("Failed to sync shards", taskResult.getException()); throw new KinesisClientLibIOException("Failed to submit shard sync task for stream " + shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
}
} }
} else { } else {
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));

View file

@ -56,9 +56,7 @@ import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
@ -72,9 +70,7 @@ import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification; import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
@ -85,19 +81,8 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/** /**
* *
@ -136,7 +121,6 @@ public class Scheduler implements Runnable {
private final LeaseCoordinator leaseCoordinator; private final LeaseCoordinator leaseCoordinator;
private final Function<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerProvider; private final Function<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>(); private final Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
private final ShardSyncTaskManager shardSyncTaskManager;
private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
private final ShardPrioritization shardPrioritization; private final ShardPrioritization shardPrioritization;
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;
@ -278,7 +262,8 @@ public class Scheduler implements Runnable {
// TODO : Halo : Check if this needs to be per stream. // TODO : Halo : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = buildPeriodicShardSyncManager(); this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
leaderDecider, streamToShardSyncTaskManagerMap);
} }
/** /**
@ -320,25 +305,16 @@ public class Scheduler implements Runnable {
log.info("Initializing LeaseCoordinator"); log.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize(); leaseCoordinator.initialize();
TaskResult result;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams // TODO: for already synced streams
waitUntilLeaseTableIsReady(); if (!waitAndCheckIfLeaseTableIsReady()) {
for(Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) { for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
log.info("Syncing Kinesis shard info for " + streamIdentifier); createOrGetShardSyncTaskManager(streamIdentifier);
final StreamConfig streamConfig = streamConfigEntry.getValue(); log.info("Creating shard sync task for " + streamIdentifier);
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier),
leaseRefresher, streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
hierarchicalShardSyncer, metricsFactory);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
// Throwing the exception, to prevent further syncs for other stream.
if (result.getException() != null) {
log.error("Caught exception when sync'ing info for " + streamIdentifier, result.getException());
throw result.getException();
} }
leaderElectedPeriodicShardSyncManager.syncShardsOnce();
} }
} else { } else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
@ -353,9 +329,10 @@ public class Scheduler implements Runnable {
log.info("LeaseCoordinator is already running. No need to start it."); log.info("LeaseCoordinator is already running. No need to start it.");
} }
log.info("Scheduling periodicShardSync)"); log.info("Scheduling periodicShardSync)");
// leaderElectedPeriodicShardSyncManager.start(); // leaderElectedPeriodicShardSyncManager.start(shardSyncTasks);
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
waitUntilHashRangeCovered(); // TODO: Determine if waitUntilHashRangeCovered() is needed.
//waitUntilHashRangeCovered();
isDone = true; isDone = true;
} catch (LeasingException e) { } catch (LeasingException e) {
log.error("Caught exception when initializing LeaseCoordinator", e); log.error("Caught exception when initializing LeaseCoordinator", e);
@ -382,7 +359,7 @@ public class Scheduler implements Runnable {
} }
@VisibleForTesting @VisibleForTesting
void waitUntilLeaseTableIsReady() throws InterruptedException, boolean waitAndCheckIfLeaseTableIsReady() throws InterruptedException,
DependencyException, ProvisionedThroughputException, InvalidStateException { DependencyException, ProvisionedThroughputException, InvalidStateException {
long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
long waitUntil = System.currentTimeMillis() + waitTime; long waitUntil = System.currentTimeMillis() + waitTime;
@ -393,6 +370,7 @@ public class Scheduler implements Runnable {
log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
} }
return !leaseRefresher.isLeaseTableEmpty();
} }
private void waitUntilHashRangeCovered() throws InterruptedException { private void waitUntilHashRangeCovered() throws InterruptedException {
@ -725,14 +703,6 @@ public class Scheduler implements Runnable {
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
} }
private PeriodicShardSyncManager buildPeriodicShardSyncManager() {
final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(getStreamIdentifier), leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
return new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
leaderDecider, shardSyncTask, metricsFactory);
}
/** /**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* <p> * <p>

View file

@ -14,9 +14,12 @@
*/ */
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -53,6 +56,10 @@ public class ShardSyncTaskManager {
private final HierarchicalShardSyncer hierarchicalShardSyncer; private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull @NonNull
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private ConsumerTask currentTask;
private CompletableFuture<TaskResult> future;
private AtomicBoolean shardSyncRequestPending;
private final ReentrantLock lock;
/** /**
* Constructor. * Constructor.
@ -82,6 +89,8 @@ public class ShardSyncTaskManager {
this.executorService = executorService; this.executorService = executorService;
this.hierarchicalShardSyncer = new HierarchicalShardSyncer(); this.hierarchicalShardSyncer = new HierarchicalShardSyncer();
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
this.shardSyncRequestPending = new AtomicBoolean(false);
this.lock = new ReentrantLock();
} }
/** /**
@ -110,16 +119,33 @@ public class ShardSyncTaskManager {
this.executorService = executorService; this.executorService = executorService;
this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
this.shardSyncRequestPending = new AtomicBoolean(false);
this.lock = new ReentrantLock();
} }
private ConsumerTask currentTask; public TaskResult executeShardSyncTask() {
private Future<TaskResult> future; final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector,
leaseRefresher,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
hierarchicalShardSyncer,
metricsFactory);
final ConsumerTask metricCollectingTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory);
return metricCollectingTask.call();
}
public synchronized boolean syncShardAndLeaseInfo() { public boolean syncShardAndLeaseInfo() {
try {
lock.lock();
return checkAndSubmitNextTask(); return checkAndSubmitNextTask();
} finally {
lock.unlock();
}
} }
private synchronized boolean checkAndSubmitNextTask() { private boolean checkAndSubmitNextTask() {
boolean submittedNewTask = false; boolean submittedNewTask = false;
if ((future == null) || future.isCancelled() || future.isDone()) { if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) { if ((future != null) && future.isDone()) {
@ -145,18 +171,45 @@ public class ShardSyncTaskManager {
hierarchicalShardSyncer, hierarchicalShardSyncer,
metricsFactory), metricsFactory),
metricsFactory); metricsFactory);
future = executorService.submit(currentTask); future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
.whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult));
submittedNewTask = true; submittedNewTask = true;
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Submitted new {} task.", currentTask.taskType()); log.debug("Submitted new {} task.", currentTask.taskType());
} }
} else { } else {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Previous {} task still pending. Not submitting new task.", currentTask.taskType()); log.debug("Previous {} task still pending. Not submitting new task. "
+ "Enqueued a request that will be executed when the current request completes.", currentTask.taskType());
} }
shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/);
} }
return submittedNewTask; return submittedNewTask;
} }
private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) {
if (exception != null || taskResult.getException() != null) {
log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException());
}
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the
// previous task is in this completion stage, checkAndSubmitNextTask is not invoked
// until this completionStage exits.
try {
lock.lock();
if (shardSyncRequestPending.get()) {
shardSyncRequestPending.set(false);
// reset future to null, so next call creates a new one
// without trying to get results from the old future.
future = null;
checkAndSubmitNextTask();
}
} finally {
lock.unlock();
}
}
} }

View file

@ -78,6 +78,7 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@ -175,6 +176,7 @@ public class SchedulerTest {
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null));
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
@ -278,14 +280,14 @@ public class SchedulerTest {
@Test @Test
public final void testInitializationFailureWithRetries() throws Exception { public final void testInitializationFailureWithRetries() throws Exception {
doNothing().when(leaseCoordinator).initialize(); doNothing().when(leaseCoordinator).initialize();
when(shardDetector.listShards()).thenThrow(new RuntimeException()); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenThrow(new RuntimeException());
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true)); workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig); metricsConfig, processorConfig, retrievalConfig);
scheduler.run(); scheduler.run();
verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards(); verify(dynamoDBLeaseRefresher, times(coordinatorConfig.maxInitializationAttempts())).isLeaseTableEmpty();
} }
@Test @Test
@ -298,18 +300,20 @@ public class SchedulerTest {
metricsConfig, processorConfig, retrievalConfig); metricsConfig, processorConfig, retrievalConfig);
doNothing().when(leaseCoordinator).initialize(); doNothing().when(leaseCoordinator).initialize();
when(shardDetector.listShards()).thenThrow(new RuntimeException()); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenThrow(new RuntimeException());
scheduler.run(); scheduler.run();
// verify initialization was retried for maxInitializationAttempts times // verify initialization was retried for maxInitializationAttempts times
verify(shardDetector, times(maxInitializationAttempts)).listShards(); verify(dynamoDBLeaseRefresher, times(maxInitializationAttempts)).isLeaseTableEmpty();
} }
@Test @Test
public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException { public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig); metricsConfig, processorConfig, retrievalConfig);
scheduler.initialize(); scheduler.initialize();
@ -322,7 +326,7 @@ public class SchedulerTest {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, false)); workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig); metricsConfig, processorConfig, retrievalConfig);
scheduler.initialize(); scheduler.initialize();
@ -388,7 +392,7 @@ public class SchedulerTest {
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
scheduler.waitUntilLeaseTableIsReady(); scheduler.waitAndCheckIfLeaseTableIsReady();
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
@ -407,7 +411,7 @@ public class SchedulerTest {
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
scheduler.waitUntilLeaseTableIsReady(); scheduler.waitAndCheckIfLeaseTableIsReady();
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
@ -681,6 +685,7 @@ public class SchedulerTest {
shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager); shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager);
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null));
if(shardSyncFirstAttemptFailure) { if(shardSyncFirstAttemptFailure) {
when(shardDetector.listShards()) when(shardDetector.listShards())
.thenThrow(new RuntimeException("Service Exception")) .thenThrow(new RuntimeException("Service Exception"))