diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index a2cb4983..10e477a5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -75,7 +75,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; @Accessors(fluent = true) @Slf4j public class Scheduler implements Runnable { - static final int MAX_INITIALIZATION_ATTEMPTS = 20; + private SchedulerLog slog = new SchedulerLog(); private final CheckpointConfig checkpointConfig; @@ -197,7 +197,7 @@ public class Scheduler implements Runnable { initialize(); log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { - log.error("Unable to initialize after {} attempts. Shutting down.", MAX_INITIALIZATION_ATTEMPTS, e); + log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); shutdown(); } @@ -214,7 +214,7 @@ public class Scheduler implements Runnable { boolean isDone = false; Exception lastException = null; - for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) { + for (int i = 0; (!isDone) && (i < lifecycleConfig.maxInitializationAttempts()); i++) { try { log.info("Initialization attempt {}", (i + 1)); log.info("Initializing LeaseCoordinator"); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java index 4286c8f9..eb1fc512 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java @@ -45,4 +45,9 @@ public class LifecycleConfig { * AggregatorUtil is responsible for deaggregating KPL records. */ private AggregatorUtil aggregatorUtil = new AggregatorUtil(); + + /** + * The maximum number of attempts to initialize the Scheduler + */ + private int maxInitializationAttempts = 20; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 3b8e31f9..aac15ddf 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -240,7 +240,21 @@ public class SchedulerTest { scheduler.run(); - verify(shardDetector, times(Scheduler.MAX_INITIALIZATION_ATTEMPTS)).listShards(); + verify(shardDetector, times(lifecycleConfig.maxInitializationAttempts())).listShards(); + } + + @Test + public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception { + final int maxInitializationAttempts = 5; + lifecycleConfig.maxInitializationAttempts(maxInitializationAttempts); + + doNothing().when(leaseCoordinator).initialize(); + when(shardDetector.listShards()).thenThrow(new RuntimeException()); + + scheduler.run(); + + // verify initialization was retried for maxInitializationAttempts times + verify(shardDetector, times(maxInitializationAttempts)).listShards(); }