From fdad20aff469e8cef9a42f81836a793ad88d1cbc Mon Sep 17 00:00:00 2001 From: Micah Jaffe <31011877+micah-jaffe@users.noreply.github.com> Date: Thu, 14 May 2020 09:04:44 -0700 Subject: [PATCH] Add leader-elected periodic shard sync manager as auditor to ShardEndShardSync strategy (#35) * Add leader-elected periodic shard sync manager as auditor to ShardEndShardSync strategy * Address PR feedback and add Builder unit tests --- .../lib/worker/ShardEndShardSyncStrategy.java | 16 ++-- .../clientlibrary/lib/worker/Worker.java | 92 ++++++++++++------- .../clientlibrary/lib/worker/WorkerTest.java | 58 ++++++++++++ 3 files changed, 126 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java index 18fc4a08..9efe2f51 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java @@ -16,8 +16,13 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy { private static final Log LOG = LogFactory.getLog(Worker.class); private ShardSyncTaskManager shardSyncTaskManager; - ShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) { + /** Runs periodic shard sync jobs in the background as an auditor process for shard-end syncs. */ + private PeriodicShardSyncManager periodicShardSyncManager; + + ShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager, + PeriodicShardSyncManager periodicShardSyncManager) { this.shardSyncTaskManager = shardSyncTaskManager; + this.periodicShardSyncManager = periodicShardSyncManager; } @Override @@ -42,9 +47,8 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy { @Override public TaskResult onWorkerInitialization() { - // TODO: Start leaderElectedPeriodicShardSyncManager in background - LOG.debug(String.format("onWorkerInitialization is NoOp for ShardSyncStrategyType %s", getStrategyType().toString())); - return new TaskResult(null); + LOG.info("Starting periodic shard sync background process for SHARD_END shard sync strategy."); + return periodicShardSyncManager.start(); } @Override @@ -66,7 +70,7 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy { @Override public void onWorkerShutDown() { - // TODO: Shut down leaderElectedPeriodicShardSyncManager - LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString())); + LOG.info("Stopping periodic shard sync background process for SHARD_END shard sync strategy."); + periodicShardSyncManager.stop(); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 5f1b4fac..eec3910d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -126,7 +126,7 @@ public class Worker implements Runnable { private final Optional maxGetRecordsThreadPool; private final KinesisClientLibLeaseCoordinator leaseCoordinator; - private final ShardSyncTaskManager controlServer; + private final ShardSyncTaskManager shardSyncTaskManager; private final ShardPrioritization shardPrioritization; @@ -562,7 +562,7 @@ public class Worker implements Runnable { this.leaseCoordinator = leaseCoordinator; this.metricsFactory = metricsFactory; this.shardSyncer = shardSyncer; - this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + this.shardSyncTaskManager = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(), shardSyncIdleTimeMillis, metricsFactory, executorService, shardSyncer); this.taskBackoffTimeMillis = taskBackoffTimeMillis; @@ -573,21 +573,37 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.workerStateChangeListener = workerStateChangeListener; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); - this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType()); - LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString())); - this.leaderDecider = leaderDecider != null ? leaderDecider : createLeaderDecider(); - this.leaderElectedPeriodicShardSyncManager = periodicShardSyncManager != null ? periodicShardSyncManager - : createPeriodicShardSyncManager(); + createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager); } - private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) { + /** + * Create shard sync strategy and corresponding {@link LeaderDecider} based on provided configs. PERIODIC + * {@link ShardSyncStrategyType} honors custom leaderDeciders for leader election strategy. All other + * {@link ShardSyncStrategyType}s permit only a default single-leader strategy. + */ + private void createShardSyncStrategy(ShardSyncStrategyType strategyType, + LeaderDecider leaderDecider, + PeriodicShardSyncManager periodicShardSyncManager) { switch (strategyType) { case PERIODIC: - return createPeriodicShardSyncStrategy(); + this.leaderDecider = getOrCreateLeaderDecider(leaderDecider); + this.leaderElectedPeriodicShardSyncManager = + getOrCreatePeriodicShardSyncManager(periodicShardSyncManager); + this.shardSyncStrategy = createPeriodicShardSyncStrategy(); + break; case SHARD_END: default: - return createShardEndShardSyncStrategy(controlServer); + if (leaderDecider != null) { + LOG.warn("LeaderDecider cannot be customized with non-PERIODIC shard sync strategy type. Using " + + "default LeaderDecider."); + } + this.leaderDecider = getOrCreateLeaderDecider(null); + this.leaderElectedPeriodicShardSyncManager = + getOrCreatePeriodicShardSyncManager(periodicShardSyncManager); + this.shardSyncStrategy = createShardEndShardSyncStrategy(); } + + LOG.info("Shard sync strategy determined as " + shardSyncStrategy.getStrategyType().toString()); } private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config, @@ -619,6 +635,20 @@ public class Worker implements Runnable { return leaseCoordinator; } + /** + * @return the leaderDecider + */ + LeaderDecider getLeaderDecider() { + return leaderDecider; + } + + /** + * @return the leaderElectedPeriodicShardSyncManager + */ + PeriodicShardSyncManager getPeriodicShardSyncManager() { + return leaderElectedPeriodicShardSyncManager; + } + /** * Start consuming data from the stream, and pass it to the application record processors. */ @@ -723,14 +753,15 @@ public class Worker implements Runnable { try { Thread.sleep(parentShardPollIntervalMillis); - leaderElectedPeriodicShardSyncManager.stop(); } catch (InterruptedException e) { LOG.debug("Sleep interrupted while initializing worker."); } } if (!isDone) { + leaderElectedPeriodicShardSyncManager.stop(); throw new RuntimeException(lastException); + } workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); } @@ -1208,19 +1239,29 @@ public class Worker implements Runnable { } private PeriodicShardSyncStrategy createPeriodicShardSyncStrategy() { - return new PeriodicShardSyncStrategy(createPeriodicShardSyncManager()); + return new PeriodicShardSyncStrategy(leaderElectedPeriodicShardSyncManager); } - private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) { - return new ShardEndShardSyncStrategy(shardSyncTaskManager); + private ShardEndShardSyncStrategy createShardEndShardSyncStrategy() { + return new ShardEndShardSyncStrategy(shardSyncTaskManager, leaderElectedPeriodicShardSyncManager); } - private LeaderDecider createLeaderDecider() { + private LeaderDecider getOrCreateLeaderDecider(LeaderDecider leaderDecider) { + if (leaderDecider != null) { + return leaderDecider; + } + return new DeterministicShuffleShardSyncLeaderDecider(leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); } - private PeriodicShardSyncManager createPeriodicShardSyncManager() { + private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager) { + // TODO: Configure periodicShardSyncManager with either mandatory shard sync (PERIODIC) or hash range + // validation based shard sync (SHARD_END) based on configured shard sync strategy + if (periodicShardSyncManager != null) { + return periodicShardSyncManager; + } + return new PeriodicShardSyncManager(config.getWorkerIdentifier(), leaderDecider, new ShardSyncTask(streamConfig.getStreamProxy(), @@ -1298,9 +1339,6 @@ public class Worker implements Runnable { private ILeaseRenewer leaseRenewer; @Setter @Accessors(fluent = true) private ShardSyncer shardSyncer; - @Setter @Accessors(fluent = true) - private PeriodicShardSyncManager periodicShardSyncManager; - @VisibleForTesting AmazonKinesis getKinesisClient() { @@ -1448,20 +1486,6 @@ public class Worker implements Runnable { Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); } - if (periodicShardSyncManager == null) { - periodicShardSyncManager = new PeriodicShardSyncManager(config.getWorkerIdentifier(), - leaderDecider, - new ShardSyncTask(kinesisProxy, - leaseManager, - config.getInitialPositionInStreamExtended(), - config.shouldCleanupLeasesUponShardCompletion(), - config.shouldIgnoreUnexpectedChildShards(), - SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, - shardSyncer, - null), - metricsFactory); - } - return new Worker(config.getApplicationName(), recordProcessorFactory, config, @@ -1495,7 +1519,7 @@ public class Worker implements Runnable { workerStateChangeListener, shardSyncer, leaderDecider, - periodicShardSyncManager); + null /* PeriodicShardSyncManager */); } > R createClient(final T builder, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index b4b2a597..08ad6efd 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -1781,6 +1781,64 @@ public class WorkerTest { Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager()); } + @Test + public void testBuilderWithDefaultShardSyncStrategy() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .build(); + + Assert.assertNotNull(worker.getLeaderDecider()); + Assert.assertNotNull(worker.getPeriodicShardSyncManager()); + } + + @Test + public void testBuilderWithPeriodicShardSyncStrategyAndDefaultLeaderDecider() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + when(config.getShardSyncStrategyType()).thenReturn(ShardSyncStrategyType.PERIODIC); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .build(); + + Assert.assertNotNull(worker.getLeaderDecider()); + Assert.assertNotNull(worker.getPeriodicShardSyncManager()); + } + + @Test + public void testBuilderWithPeriodicShardSyncStrategyAndCustomLeaderDecider() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + when(config.getShardSyncStrategyType()).thenReturn(ShardSyncStrategyType.PERIODIC); + + LeaderDecider leaderDecider = mock(LeaderDecider.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .leaderDecider(leaderDecider) + .build(); + + Assert.assertSame(leaderDecider, worker.getLeaderDecider()); + Assert.assertNotNull(worker.getPeriodicShardSyncManager()); + } + + @Test + public void testCustomLeaderDeciderNotAllowedForShardEndShardSync() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + when(config.getShardSyncStrategyType()).thenReturn(ShardSyncStrategyType.SHARD_END); + + LeaderDecider leaderDecider = mock(LeaderDecider.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .leaderDecider(leaderDecider) + .build(); + + // Worker should override custom leaderDecider and use default instead + Assert.assertNotSame(leaderDecider, worker.getLeaderDecider()); + Assert.assertNotNull(worker.getPeriodicShardSyncManager()); + } + @Test public void testBuilderSetRegionAndEndpointToClient() { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);