Add leader-elected shard sync on application bootstrap (#28)

* Add leader-elected shard sync on application bootstrap

* Add additional testing and address PR feedback

* Remove runShardSync leader election from boostrap case

* Remove random UUID workerId and update unit tests
This commit is contained in:
Micah Jaffe 2020-05-07 17:08:53 -07:00 committed by GitHub
parent 550d7af5b1
commit 45e76b0fd5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 234 additions and 64 deletions

View file

@ -177,7 +177,7 @@ class KinesisShardSyncer implements ShardSyncer {
List<Shard> shards; List<Shard> shards;
if(CollectionUtils.isNullOrEmpty(latestShards)) { if(CollectionUtils.isNullOrEmpty(latestShards)) {
shards = getCompleteShardList(kinesisProxy); shards = getShardListAtInitialPosition(kinesisProxy, initialPosition);
} else { } else {
shards = latestShards; shards = latestShards;
} }

View file

@ -58,14 +58,31 @@ class PeriodicShardSyncManager {
public synchronized TaskResult start() { public synchronized TaskResult start() {
if (!isRunning) { if (!isRunning) {
final Runnable periodicShardSyncer = () -> {
try {
runShardSync();
} catch (Throwable t) {
LOG.error("Error running shard sync.", t);
}
};
shardSyncThreadPool shardSyncThreadPool
.scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, .scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
isRunning = true; isRunning = true;
} }
return new TaskResult(null); return new TaskResult(null);
} }
/**
* Runs ShardSync once, without scheduling further periodic ShardSyncs.
* @return TaskResult from shard sync
*/
public synchronized TaskResult syncShardsOnce() {
LOG.info("Syncing shards once from worker " + workerId);
return metricsEmittingShardSyncTask.call();
}
public void stop() { public void stop() {
if (isRunning) { if (isRunning) {
LOG.info(String.format("Shutting down leader decider on worker %s", workerId)); LOG.info(String.format("Shutting down leader decider on worker %s", workerId));
@ -77,15 +94,12 @@ class PeriodicShardSyncManager {
} }
private void runShardSync() { private void runShardSync() {
try {
if (leaderDecider.isLeader(workerId)) { if (leaderDecider.isLeader(workerId)) {
LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId));
metricsEmittingShardSyncTask.call(); metricsEmittingShardSyncTask.call();
} else { } else {
LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
} }
} catch (Throwable t) {
LOG.error("Error during runShardSync.", t);
}
} }
} }

View file

@ -42,6 +42,7 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy {
@Override @Override
public TaskResult onWorkerInitialization() { public TaskResult onWorkerInitialization() {
// TODO: Start leaderElectedPeriodicShardSyncManager in background
LOG.debug(String.format("onWorkerInitialization is NoOp for ShardSyncStrategyType %s", getStrategyType().toString())); LOG.debug(String.format("onWorkerInitialization is NoOp for ShardSyncStrategyType %s", getStrategyType().toString()));
return new TaskResult(null); return new TaskResult(null);
} }
@ -65,6 +66,7 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy {
@Override @Override
public void onWorkerShutDown() { public void onWorkerShutDown() {
// TODO: Shut down leaderElectedPeriodicShardSyncManager
LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString())); LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString()));
} }
} }

View file

@ -29,11 +29,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer; import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer;
@ -88,9 +92,14 @@ public class Worker implements Runnable {
private static final Log LOG = LogFactory.getLog(Worker.class); private static final Log LOG = LogFactory.getLog(Worker.class);
// Default configs for periodic shard sync
private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0; private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0;
private static final int MAX_INITIALIZATION_ATTEMPTS = 20; private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL. private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL.
static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator(); private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator();
private static final LeaseSelector<KinesisClientLease> DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector<KinesisClientLease>(); private static final LeaseSelector<KinesisClientLease> DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector<KinesisClientLease>();
@ -147,6 +156,7 @@ public class Worker implements Runnable {
// Periodic Shard Sync related fields // Periodic Shard Sync related fields
private LeaderDecider leaderDecider; private LeaderDecider leaderDecider;
private ShardSyncStrategy shardSyncStrategy; private ShardSyncStrategy shardSyncStrategy;
private PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
/** /**
* Constructor. * Constructor.
@ -406,7 +416,7 @@ public class Worker implements Runnable {
config.getShardPrioritizationStrategy(), config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(), config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(), config.getMaxGetRecordsThreadPool(),
DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */); DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null, null);
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) { if (config.getRegionName() != null) {
@ -467,7 +477,7 @@ public class Worker implements Runnable {
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_LEASE_CLEANUP_VALIDATOR, null); DEFAULT_LEASE_CLEANUP_VALIDATOR, null, null);
} }
/** /**
@ -507,6 +517,10 @@ public class Worker implements Runnable {
* Max number of threads in the getRecords thread pool. * Max number of threads in the getRecords thread pool.
* @param leaseCleanupValidator * @param leaseCleanupValidator
* leaseCleanupValidator instance used to validate leases * leaseCleanupValidator instance used to validate leases
* @param leaderDecider
* leaderDecider instance used elect shard sync leaders
* @param periodicShardSyncManager
* manages periodic shard sync tasks
*/ */
// NOTE: This has package level access solely for testing // NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@ -517,13 +531,13 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider) { LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator), maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator),
leaderDecider); leaderDecider, periodicShardSyncManager);
} }
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
@ -533,7 +547,8 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool,
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) { WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider,
PeriodicShardSyncManager periodicShardSyncManager) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
this.config = config; this.config = config;
@ -558,15 +573,17 @@ public class Worker implements Runnable {
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.workerStateChangeListener = workerStateChangeListener; this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
this.leaderDecider = leaderDecider;
this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType()); this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType());
LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString())); LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString()));
this.leaderDecider = leaderDecider != null ? leaderDecider : createLeaderDecider();
this.leaderElectedPeriodicShardSyncManager = periodicShardSyncManager != null ? periodicShardSyncManager
: createPeriodicShardSyncManager();
} }
private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) { private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) {
switch (strategyType) { switch (strategyType) {
case PERIODIC: case PERIODIC:
return createPeriodicShardSyncStrategy(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager()); return createPeriodicShardSyncStrategy();
case SHARD_END: case SHARD_END:
default: default:
return createShardEndShardSyncStrategy(controlServer); return createShardEndShardSyncStrategy(controlServer);
@ -673,30 +690,30 @@ public class Worker implements Runnable {
LOG.info("Initializing LeaseCoordinator"); LOG.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize(); leaseCoordinator.initialize();
TaskResult result = null; // Perform initial lease sync if configs allow it, with jitter.
if (!skipShardSyncAtWorkerInitializationIfLeasesExist if (shouldInitiateLeaseSync()) {
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { LOG.info(config.getWorkerIdentifier() + " worker is beginning initial lease sync.");
LOG.info("Syncing Kinesis shard info"); TaskResult result = leaderElectedPeriodicShardSyncManager.syncShardsOnce();
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), if (result.getException() != null) {
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, throw result.getException();
config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer, null); }
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
} }
if (result == null || result.getException() == null) { // If we reach this point, then we either skipped the lease sync or did not have any exception for the
// shard sync in the previous attempt.
if (!leaseCoordinator.isRunning()) { if (!leaseCoordinator.isRunning()) {
LOG.info("Starting LeaseCoordinator"); LOG.info("Starting LeaseCoordinator");
leaseCoordinator.start(); leaseCoordinator.start();
} else { } else {
LOG.info("LeaseCoordinator is already running. No need to start it."); LOG.info("LeaseCoordinator is already running. No need to start it.");
} }
// All shard sync strategies' initialization handlers should begin a periodic shard sync. For
// PeriodicShardSync strategy, this is the main shard sync loop. For ShardEndShardSync and other
// shard sync strategies, this serves as an auditor background process.
shardSyncStrategy.onWorkerInitialization(); shardSyncStrategy.onWorkerInitialization();
isDone = true; isDone = true;
} else {
lastException = result.getException();
}
} catch (LeasingException e) { } catch (LeasingException e) {
LOG.error("Caught exception when initializing LeaseCoordinator", e); LOG.error("Caught exception when initializing LeaseCoordinator", e);
lastException = e; lastException = e;
@ -706,6 +723,7 @@ public class Worker implements Runnable {
try { try {
Thread.sleep(parentShardPollIntervalMillis); Thread.sleep(parentShardPollIntervalMillis);
leaderElectedPeriodicShardSyncManager.stop();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.debug("Sleep interrupted while initializing worker."); LOG.debug("Sleep interrupted while initializing worker.");
} }
@ -717,6 +735,32 @@ public class Worker implements Runnable {
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
} }
@VisibleForTesting
boolean shouldInitiateLeaseSync() throws InterruptedException, DependencyException, InvalidStateException,
ProvisionedThroughputException {
final ILeaseManager leaseManager = leaseCoordinator.getLeaseManager();
if (skipShardSyncAtWorkerInitializationIfLeasesExist && !leaseManager.isLeaseTableEmpty()) {
LOG.info("Skipping shard sync because getSkipShardSyncAtWorkerInitializationIfLeasesExist config is set " +
"to TRUE and lease table is not empty.");
return false;
}
final long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS,
MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
final long waitUntil = System.currentTimeMillis() + waitTime;
boolean shouldInitiateLeaseSync = true;
while (System.currentTimeMillis() < waitUntil && (shouldInitiateLeaseSync = leaseManager.isLeaseTableEmpty())) {
// Check every 3 seconds if lease table is still empty, to minimize contention between all workers
// bootstrapping from empty lease table at the same time.
LOG.info("Lease table is still empty. Checking again in " + LEASE_TABLE_CHECK_FREQUENCY_MILLIS + " ms.");
Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
}
return shouldInitiateLeaseSync;
}
/** /**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* *
@ -1163,20 +1207,33 @@ public class Worker implements Runnable {
} }
} }
private PeriodicShardSyncStrategy createPeriodicShardSyncStrategy(IKinesisProxy kinesisProxy, private PeriodicShardSyncStrategy createPeriodicShardSyncStrategy() {
ILeaseManager<KinesisClientLease> leaseManager) { return new PeriodicShardSyncStrategy(createPeriodicShardSyncManager());
return new PeriodicShardSyncStrategy(
new PeriodicShardSyncManager(config.getWorkerIdentifier(), leaderDecider,
new ShardSyncTask(kinesisProxy, leaseManager, config.getInitialPositionInStreamExtended(),
config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(), SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
shardSyncer, null), metricsFactory));
} }
private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) { private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) {
return new ShardEndShardSyncStrategy(shardSyncTaskManager); return new ShardEndShardSyncStrategy(shardSyncTaskManager);
} }
private LeaderDecider createLeaderDecider() {
return new DeterministicShuffleShardSyncLeaderDecider(leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
}
private PeriodicShardSyncManager createPeriodicShardSyncManager() {
return new PeriodicShardSyncManager(config.getWorkerIdentifier(),
leaderDecider,
new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(),
config.getInitialPositionInStreamExtended(),
config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(),
SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
shardSyncer,
null),
metricsFactory);
}
/** /**
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not. * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not.
* Visible and non-final only for testing. * Visible and non-final only for testing.
@ -1241,6 +1298,8 @@ public class Worker implements Runnable {
private ILeaseRenewer<KinesisClientLease> leaseRenewer; private ILeaseRenewer<KinesisClientLease> leaseRenewer;
@Setter @Accessors(fluent = true) @Setter @Accessors(fluent = true)
private ShardSyncer shardSyncer; private ShardSyncer shardSyncer;
@Setter @Accessors(fluent = true)
private PeriodicShardSyncManager periodicShardSyncManager;
@VisibleForTesting @VisibleForTesting
@ -1389,6 +1448,20 @@ public class Worker implements Runnable {
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
} }
if (periodicShardSyncManager == null) {
periodicShardSyncManager = new PeriodicShardSyncManager(config.getWorkerIdentifier(),
leaderDecider,
new ShardSyncTask(kinesisProxy,
leaseManager,
config.getInitialPositionInStreamExtended(),
config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(),
SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
shardSyncer,
null),
metricsFactory);
}
return new Worker(config.getApplicationName(), return new Worker(config.getApplicationName(),
recordProcessorFactory, recordProcessorFactory,
config, config,
@ -1419,7 +1492,10 @@ public class Worker implements Runnable {
shardPrioritization, shardPrioritization,
config.getRetryGetRecordsInSeconds(), config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(), config.getMaxGetRecordsThreadPool(),
workerStateChangeListener, shardSyncer, leaderDecider); workerStateChangeListener,
shardSyncer,
leaderDecider,
periodicShardSyncManager);
} }
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder, <R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,

View file

@ -18,7 +18,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -30,7 +29,6 @@ import java.util.stream.Stream;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType; import com.amazonaws.services.kinesis.model.ShardFilterType;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -60,7 +58,7 @@ import com.amazonaws.services.kinesis.model.Shard;
import junit.framework.Assert; import junit.framework.Assert;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -1816,10 +1814,13 @@ public class ShardSyncerTest {
dataFile.deleteOnExit(); dataFile.deleteOnExit();
final IKinesisProxy kinesisProxy = spy(new KinesisLocalFileProxy(dataFile.getAbsolutePath())); final IKinesisProxy kinesisProxy = spy(new KinesisLocalFileProxy(dataFile.getAbsolutePath()));
// Make sure ListShardsWithFilter is called in all public shard sync methods
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition, shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition,
cleanupLeasesOfCompletedShards, false); cleanupLeasesOfCompletedShards, false);
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition,
cleanupLeasesOfCompletedShards, false, null);
verify(kinesisProxy, atLeastOnce()).getShardListWithFilter(shardFilter); verify(kinesisProxy, atLeast(2)).getShardListWithFilter(shardFilter);
verify(kinesisProxy, never()).getShardList(); verify(kinesisProxy, never()).getShardList();
} }

View file

@ -72,6 +72,7 @@ import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.hamcrest.Condition; import org.hamcrest.Condition;
@ -158,6 +159,7 @@ public class WorkerTest {
private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d";
private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d";
private static final String WORKER_ID = "workerId";
private RecordsFetcherFactory recordsFetcherFactory; private RecordsFetcherFactory recordsFetcherFactory;
private KinesisClientLibConfiguration config; private KinesisClientLibConfiguration config;
@ -194,7 +196,7 @@ public class WorkerTest {
@Before @Before
public void setup() { public void setup() {
config = spy(new KinesisClientLibConfiguration("app", null, null, null)); config = spy(new KinesisClientLibConfiguration("app", null, null, WORKER_ID));
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
} }
@ -244,7 +246,7 @@ public class WorkerTest {
@Test @Test
public final void testGetStageName() { public final void testGetStageName() {
final String stageName = "testStageName"; final String stageName = "testStageName";
config = new KinesisClientLibConfiguration(stageName, null, null, null); config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID);
Worker worker = new Worker(v1RecordProcessorFactory, config); Worker worker = new Worker(v1RecordProcessorFactory, config);
Assert.assertEquals(stageName, worker.getApplicationName()); Assert.assertEquals(stageName, worker.getApplicationName());
} }
@ -253,7 +255,7 @@ public class WorkerTest {
public final void testCreateOrGetShardConsumer() { public final void testCreateOrGetShardConsumer() {
final String stageName = "testStageName"; final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
config = new KinesisClientLibConfiguration(stageName, null, null, null); config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID);
IKinesisProxy proxy = null; IKinesisProxy proxy = null;
ICheckpoint checkpoint = null; ICheckpoint checkpoint = null;
int maxRecords = 1; int maxRecords = 1;
@ -372,7 +374,7 @@ public class WorkerTest {
public final void testCleanupShardConsumers() { public final void testCleanupShardConsumers() {
final String stageName = "testStageName"; final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
config = new KinesisClientLibConfiguration(stageName, null, null, null); config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID);
IKinesisProxy proxy = null; IKinesisProxy proxy = null;
ICheckpoint checkpoint = null; ICheckpoint checkpoint = null;
int maxRecords = 1; int maxRecords = 1;
@ -429,12 +431,12 @@ public class WorkerTest {
} }
@Test @Test
public final void testInitializationFailureWithRetries() { public final void testInitializationFailureWithRetries() throws Exception {
String stageName = "testInitializationWorker"; String stageName = "testInitializationWorker";
IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null);
config = new KinesisClientLibConfiguration(stageName, null, null, null); config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID);
int count = 0; int count = 0;
when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++))); when(proxy.getShardListWithFilter(any())).thenThrow(new RuntimeException(Integer.toString(count++)));
int maxRecords = 2; int maxRecords = 2;
long idleTimeInMilliseconds = 1L; long idleTimeInMilliseconds = 1L;
StreamConfig streamConfig = StreamConfig streamConfig =
@ -443,6 +445,7 @@ public class WorkerTest {
idleTimeInMilliseconds, idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseManager.isLeaseTableEmpty()).thenReturn(true);
ExecutorService execService = Executors.newSingleThreadExecutor(); ExecutorService execService = Executors.newSingleThreadExecutor();
long shardPollInterval = 0L; long shardPollInterval = 0L;
Worker worker = Worker worker =
@ -465,6 +468,79 @@ public class WorkerTest {
Assert.assertTrue(count > 0); Assert.assertTrue(count > 0);
} }
@Test
public final void testInitializationWaitsWhenLeaseTableIsEmpty() throws Exception {
final String stageName = "testInitializationWorker";
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseManager.isLeaseTableEmpty()).thenReturn(true);
final int maxRecords = 2;
final long idleTimeInMilliseconds = 1L;
final StreamConfig streamConfig = new StreamConfig(proxy, maxRecords, idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
final long shardPollInterval = 0L;
final Worker worker =
new Worker(stageName,
v2RecordProcessorFactory,
config,
streamConfig, INITIAL_POSITION_TRIM_HORIZON,
shardPollInterval,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
Executors.newSingleThreadExecutor(),
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
final long startTime = System.currentTimeMillis();
worker.shouldInitiateLeaseSync();
final long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime > Worker.MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
assertTrue(endTime - startTime < Worker.MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS + Worker.LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
}
@Test
public final void testInitializationDoesntWaitWhenLeaseTableIsNotEmpty() throws Exception {
final String stageName = "testInitializationWorker";
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseManager.isLeaseTableEmpty()).thenReturn(false);
final int maxRecords = 2;
final long idleTimeInMilliseconds = 1L;
final StreamConfig streamConfig = new StreamConfig(proxy, maxRecords, idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
final long shardPollInterval = 0L;
final Worker worker =
new Worker(stageName,
v2RecordProcessorFactory,
config,
streamConfig, INITIAL_POSITION_TRIM_HORIZON,
shardPollInterval,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
Executors.newSingleThreadExecutor(),
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
final long startTime = System.currentTimeMillis();
worker.shouldInitiateLeaseSync();
final long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime < Worker.MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
}
/** /**
* Runs worker with threadPoolSize == numShards * Runs worker with threadPoolSize == numShards
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#run()}. * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#run()}.
@ -576,6 +652,7 @@ public class WorkerTest {
final ExecutorService executorService = mock(ThreadPoolExecutor.class); final ExecutorService executorService = mock(ThreadPoolExecutor.class);
final CWMetricsFactory cwMetricsFactory = mock(CWMetricsFactory.class); final CWMetricsFactory cwMetricsFactory = mock(CWMetricsFactory.class);
when(cwMetricsFactory.createMetrics()).thenReturn(mock(IMetricsScope.class));
// Make sure that worker thread is run before invoking shutdown. // Make sure that worker thread is run before invoking shutdown.
final CountDownLatch workerStarted = new CountDownLatch(1); final CountDownLatch workerStarted = new CountDownLatch(1);
doAnswer(new Answer<Boolean>() { doAnswer(new Answer<Boolean>() {
@ -1708,7 +1785,7 @@ public class WorkerTest {
public void testBuilderSetRegionAndEndpointToClient() { public void testBuilderSetRegionAndEndpointToClient() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final String endpoint = "TestEndpoint"; final String endpoint = "TestEndpoint";
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID)
.withRegionName(Regions.US_WEST_2.getName()) .withRegionName(Regions.US_WEST_2.getName())
.withKinesisEndpoint(endpoint) .withKinesisEndpoint(endpoint)
.withDynamoDBEndpoint(endpoint); .withDynamoDBEndpoint(endpoint);
@ -1736,7 +1813,7 @@ public class WorkerTest {
public void testBuilderSetRegionToClient() { public void testBuilderSetRegionToClient() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
String region = Regions.US_WEST_2.getName(); String region = Regions.US_WEST_2.getName();
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID)
.withRegionName(region); .withRegionName(region);
Worker.Builder builder = new Worker.Builder(); Worker.Builder builder = new Worker.Builder();
@ -1763,7 +1840,7 @@ public class WorkerTest {
@Test @Test
public void testBuilderGenerateClients() { public void testBuilderGenerateClients() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID);
Worker.Builder builder = spy(new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config)); Worker.Builder builder = spy(new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config));
ArgumentCaptor<AwsClientBuilder> builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class); ArgumentCaptor<AwsClientBuilder> builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class);
@ -1789,7 +1866,7 @@ public class WorkerTest {
public void testBuilderGenerateClientsWithRegion() { public void testBuilderGenerateClientsWithRegion() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
String region = Regions.US_WEST_2.getName(); String region = Regions.US_WEST_2.getName();
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID)
.withRegionName(region); .withRegionName(region);
ArgumentCaptor<AwsClientBuilder> builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class); ArgumentCaptor<AwsClientBuilder> builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class);
@ -1809,7 +1886,7 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
String region = Regions.US_WEST_2.getName(); String region = Regions.US_WEST_2.getName();
String endpointUrl = "TestEndpoint"; String endpointUrl = "TestEndpoint";
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID)
.withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl); .withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl);
Worker.Builder builder = spy(new Worker.Builder()); Worker.Builder builder = spy(new Worker.Builder());