From e694ab7724f75fa1b2bc6532e40870c182eb090e Mon Sep 17 00:00:00 2001 From: muktiranjan <42283789+muktiranjan@users.noreply.github.com> Date: Wed, 15 Aug 2018 13:25:07 -0700 Subject: [PATCH] =?UTF-8?q?Moving=20the=20max=20number=20of=20Scheduler=20?= =?UTF-8?q?initialization=20attempts=20parameter=20=E2=80=A6=20(#368)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Moving the max number of Scheduler initialization attempts parameter to CoordinatorConfig * Changing the max initialization attempts variable name --- .../amazon/kinesis/coordinator/CoordinatorConfig.java | 7 +++++++ .../software/amazon/kinesis/coordinator/Scheduler.java | 6 ++++-- .../software/amazon/kinesis/lifecycle/LifecycleConfig.java | 4 ---- .../software/amazon/kinesis/coordinator/SchedulerTest.java | 6 ++++-- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index 929a40d2..e1ccc0e8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -35,6 +35,13 @@ public class CoordinatorConfig { @NonNull private final String applicationName; + /** + * The maximum number of attempts to initialize the Scheduler + * + *
Default value: 20
+ */ + private int maxInitializationAttempts = 20; + /** * Interval in milliseconds between polling to check for parent shard completion. * Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 10e477a5..c16a80e5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -87,6 +87,7 @@ public class Scheduler implements Runnable { private final RetrievalConfig retrievalConfig; private final String applicationName; + private final int maxInitializationAttempts; private final Checkpointer checkpoint; private final long shardConsumerDispatchPollIntervalMillis; // Backoff time when polling to check if application has finished processing @@ -144,6 +145,7 @@ public class Scheduler implements Runnable { this.retrievalConfig = retrievalConfig; this.applicationName = this.coordinatorConfig.applicationName(); + this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory() .createLeaseCoordinator(this.metricsFactory); @@ -197,7 +199,7 @@ public class Scheduler implements Runnable { initialize(); log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { - log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); + log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e); shutdown(); } @@ -214,7 +216,7 @@ public class Scheduler implements Runnable { boolean isDone = false; Exception lastException = null; - for (int i = 0; (!isDone) && (i < lifecycleConfig.maxInitializationAttempts()); i++) { + for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) { try { log.info("Initialization attempt {}", (i + 1)); log.info("Initializing LeaseCoordinator"); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java index eb1fc512..b91376dd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java @@ -46,8 +46,4 @@ public class LifecycleConfig { */ private AggregatorUtil aggregatorUtil = new AggregatorUtil(); - /** - * The maximum number of attempts to initialize the Scheduler - */ - private int maxInitializationAttempts = 20; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index aac15ddf..9a10f6b3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -240,13 +240,15 @@ public class SchedulerTest { scheduler.run(); - verify(shardDetector, times(lifecycleConfig.maxInitializationAttempts())).listShards(); + verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards(); } @Test public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception { final int maxInitializationAttempts = 5; - lifecycleConfig.maxInitializationAttempts(maxInitializationAttempts); + coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); doNothing().when(leaseCoordinator).initialize(); when(shardDetector.listShards()).thenThrow(new RuntimeException());