Add leader-elected periodic shard sync manager as auditor to ShardEndShardSync strategy (#35)

* Add leader-elected periodic shard sync manager as auditor to ShardEndShardSync strategy

* Address PR feedback and add Builder unit tests
This commit is contained in:
Micah Jaffe 2020-05-14 09:04:44 -07:00 committed by GitHub
parent 45e76b0fd5
commit fdad20aff4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 126 additions and 40 deletions

View file

@ -16,8 +16,13 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy {
private static final Log LOG = LogFactory.getLog(Worker.class);
private ShardSyncTaskManager shardSyncTaskManager;
ShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) {
/** Runs periodic shard sync jobs in the background as an auditor process for shard-end syncs. */
private PeriodicShardSyncManager periodicShardSyncManager;
ShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager,
PeriodicShardSyncManager periodicShardSyncManager) {
this.shardSyncTaskManager = shardSyncTaskManager;
this.periodicShardSyncManager = periodicShardSyncManager;
}
@Override
@ -42,9 +47,8 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy {
@Override
public TaskResult onWorkerInitialization() {
// TODO: Start leaderElectedPeriodicShardSyncManager in background
LOG.debug(String.format("onWorkerInitialization is NoOp for ShardSyncStrategyType %s", getStrategyType().toString()));
return new TaskResult(null);
LOG.info("Starting periodic shard sync background process for SHARD_END shard sync strategy.");
return periodicShardSyncManager.start();
}
@Override
@ -66,7 +70,7 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy {
@Override
public void onWorkerShutDown() {
// TODO: Shut down leaderElectedPeriodicShardSyncManager
LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString()));
LOG.info("Stopping periodic shard sync background process for SHARD_END shard sync strategy.");
periodicShardSyncManager.stop();
}
}

View file

@ -126,7 +126,7 @@ public class Worker implements Runnable {
private final Optional<Integer> maxGetRecordsThreadPool;
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager controlServer;
private final ShardSyncTaskManager shardSyncTaskManager;
private final ShardPrioritization shardPrioritization;
@ -562,7 +562,7 @@ public class Worker implements Runnable {
this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory;
this.shardSyncer = shardSyncer;
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
this.shardSyncTaskManager = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
shardSyncIdleTimeMillis, metricsFactory, executorService, shardSyncer);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
@ -573,21 +573,37 @@ public class Worker implements Runnable {
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType());
LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString()));
this.leaderDecider = leaderDecider != null ? leaderDecider : createLeaderDecider();
this.leaderElectedPeriodicShardSyncManager = periodicShardSyncManager != null ? periodicShardSyncManager
: createPeriodicShardSyncManager();
createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager);
}
private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) {
/**
* Create shard sync strategy and corresponding {@link LeaderDecider} based on provided configs. PERIODIC
* {@link ShardSyncStrategyType} honors custom leaderDeciders for leader election strategy. All other
* {@link ShardSyncStrategyType}s permit only a default single-leader strategy.
*/
private void createShardSyncStrategy(ShardSyncStrategyType strategyType,
LeaderDecider leaderDecider,
PeriodicShardSyncManager periodicShardSyncManager) {
switch (strategyType) {
case PERIODIC:
return createPeriodicShardSyncStrategy();
this.leaderDecider = getOrCreateLeaderDecider(leaderDecider);
this.leaderElectedPeriodicShardSyncManager =
getOrCreatePeriodicShardSyncManager(periodicShardSyncManager);
this.shardSyncStrategy = createPeriodicShardSyncStrategy();
break;
case SHARD_END:
default:
return createShardEndShardSyncStrategy(controlServer);
if (leaderDecider != null) {
LOG.warn("LeaderDecider cannot be customized with non-PERIODIC shard sync strategy type. Using " +
"default LeaderDecider.");
}
this.leaderDecider = getOrCreateLeaderDecider(null);
this.leaderElectedPeriodicShardSyncManager =
getOrCreatePeriodicShardSyncManager(periodicShardSyncManager);
this.shardSyncStrategy = createShardEndShardSyncStrategy();
}
LOG.info("Shard sync strategy determined as " + shardSyncStrategy.getStrategyType().toString());
}
private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config,
@ -619,6 +635,20 @@ public class Worker implements Runnable {
return leaseCoordinator;
}
/**
* @return the leaderDecider
*/
LeaderDecider getLeaderDecider() {
return leaderDecider;
}
/**
* @return the leaderElectedPeriodicShardSyncManager
*/
PeriodicShardSyncManager getPeriodicShardSyncManager() {
return leaderElectedPeriodicShardSyncManager;
}
/**
* Start consuming data from the stream, and pass it to the application record processors.
*/
@ -723,14 +753,15 @@ public class Worker implements Runnable {
try {
Thread.sleep(parentShardPollIntervalMillis);
leaderElectedPeriodicShardSyncManager.stop();
} catch (InterruptedException e) {
LOG.debug("Sleep interrupted while initializing worker.");
}
}
if (!isDone) {
leaderElectedPeriodicShardSyncManager.stop();
throw new RuntimeException(lastException);
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
}
@ -1208,19 +1239,29 @@ public class Worker implements Runnable {
}
private PeriodicShardSyncStrategy createPeriodicShardSyncStrategy() {
return new PeriodicShardSyncStrategy(createPeriodicShardSyncManager());
return new PeriodicShardSyncStrategy(leaderElectedPeriodicShardSyncManager);
}
private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) {
return new ShardEndShardSyncStrategy(shardSyncTaskManager);
private ShardEndShardSyncStrategy createShardEndShardSyncStrategy() {
return new ShardEndShardSyncStrategy(shardSyncTaskManager, leaderElectedPeriodicShardSyncManager);
}
private LeaderDecider createLeaderDecider() {
private LeaderDecider getOrCreateLeaderDecider(LeaderDecider leaderDecider) {
if (leaderDecider != null) {
return leaderDecider;
}
return new DeterministicShuffleShardSyncLeaderDecider(leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
}
private PeriodicShardSyncManager createPeriodicShardSyncManager() {
private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager) {
// TODO: Configure periodicShardSyncManager with either mandatory shard sync (PERIODIC) or hash range
// validation based shard sync (SHARD_END) based on configured shard sync strategy
if (periodicShardSyncManager != null) {
return periodicShardSyncManager;
}
return new PeriodicShardSyncManager(config.getWorkerIdentifier(),
leaderDecider,
new ShardSyncTask(streamConfig.getStreamProxy(),
@ -1298,9 +1339,6 @@ public class Worker implements Runnable {
private ILeaseRenewer<KinesisClientLease> leaseRenewer;
@Setter @Accessors(fluent = true)
private ShardSyncer shardSyncer;
@Setter @Accessors(fluent = true)
private PeriodicShardSyncManager periodicShardSyncManager;
@VisibleForTesting
AmazonKinesis getKinesisClient() {
@ -1448,20 +1486,6 @@ public class Worker implements Runnable {
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
}
if (periodicShardSyncManager == null) {
periodicShardSyncManager = new PeriodicShardSyncManager(config.getWorkerIdentifier(),
leaderDecider,
new ShardSyncTask(kinesisProxy,
leaseManager,
config.getInitialPositionInStreamExtended(),
config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(),
SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
shardSyncer,
null),
metricsFactory);
}
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
@ -1495,7 +1519,7 @@ public class Worker implements Runnable {
workerStateChangeListener,
shardSyncer,
leaderDecider,
periodicShardSyncManager);
null /* PeriodicShardSyncManager */);
}
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,

View file

@ -1781,6 +1781,64 @@ public class WorkerTest {
Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager());
}
@Test
public void testBuilderWithDefaultShardSyncStrategy() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.build();
Assert.assertNotNull(worker.getLeaderDecider());
Assert.assertNotNull(worker.getPeriodicShardSyncManager());
}
@Test
public void testBuilderWithPeriodicShardSyncStrategyAndDefaultLeaderDecider() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
when(config.getShardSyncStrategyType()).thenReturn(ShardSyncStrategyType.PERIODIC);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.build();
Assert.assertNotNull(worker.getLeaderDecider());
Assert.assertNotNull(worker.getPeriodicShardSyncManager());
}
@Test
public void testBuilderWithPeriodicShardSyncStrategyAndCustomLeaderDecider() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
when(config.getShardSyncStrategyType()).thenReturn(ShardSyncStrategyType.PERIODIC);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.leaderDecider(leaderDecider)
.build();
Assert.assertSame(leaderDecider, worker.getLeaderDecider());
Assert.assertNotNull(worker.getPeriodicShardSyncManager());
}
@Test
public void testCustomLeaderDeciderNotAllowedForShardEndShardSync() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
when(config.getShardSyncStrategyType()).thenReturn(ShardSyncStrategyType.SHARD_END);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.leaderDecider(leaderDecider)
.build();
// Worker should override custom leaderDecider and use default instead
Assert.assertNotSame(leaderDecider, worker.getLeaderDecider());
Assert.assertNotNull(worker.getPeriodicShardSyncManager());
}
@Test
public void testBuilderSetRegionAndEndpointToClient() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);