Merge pull request #363 from muktiranjan/development

Making the maximum number of Scheduler initialization attempts configurable
This commit is contained in:
Justin Pfifer 2018-08-13 07:01:28 -07:00 committed by GitHub
commit 69899cc394
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 4 deletions

View file

@ -75,7 +75,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
@Accessors(fluent = true)
@Slf4j
public class Scheduler implements Runnable {
static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private SchedulerLog slog = new SchedulerLog();
private final CheckpointConfig checkpointConfig;
@ -197,7 +197,7 @@ public class Scheduler implements Runnable {
initialize();
log.info("Initialization complete. Starting worker loop.");
} catch (RuntimeException e) {
log.error("Unable to initialize after {} attempts. Shutting down.", MAX_INITIALIZATION_ATTEMPTS, e);
log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e);
shutdown();
}
@ -214,7 +214,7 @@ public class Scheduler implements Runnable {
boolean isDone = false;
Exception lastException = null;
for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) {
for (int i = 0; (!isDone) && (i < lifecycleConfig.maxInitializationAttempts()); i++) {
try {
log.info("Initialization attempt {}", (i + 1));
log.info("Initializing LeaseCoordinator");

View file

@ -45,4 +45,9 @@ public class LifecycleConfig {
* AggregatorUtil is responsible for deaggregating KPL records.
*/
private AggregatorUtil aggregatorUtil = new AggregatorUtil();
/**
* The maximum number of attempts to initialize the Scheduler
*/
private int maxInitializationAttempts = 20;
}

View file

@ -240,7 +240,21 @@ public class SchedulerTest {
scheduler.run();
verify(shardDetector, times(Scheduler.MAX_INITIALIZATION_ATTEMPTS)).listShards();
verify(shardDetector, times(lifecycleConfig.maxInitializationAttempts())).listShards();
}
@Test
public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception {
final int maxInitializationAttempts = 5;
lifecycleConfig.maxInitializationAttempts(maxInitializationAttempts);
doNothing().when(leaseCoordinator).initialize();
when(shardDetector.listShards()).thenThrow(new RuntimeException());
scheduler.run();
// verify initialization was retried for maxInitializationAttempts times
verify(shardDetector, times(maxInitializationAttempts)).listShards();
}