Making the maximum number of Scheduler initialization attempts configurable
This commit is contained in:
parent
0b267037ea
commit
e8553ed5a9
3 changed files with 23 additions and 4 deletions
|
|
@ -75,7 +75,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
|
||||||
@Accessors(fluent = true)
|
@Accessors(fluent = true)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class Scheduler implements Runnable {
|
public class Scheduler implements Runnable {
|
||||||
static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
|
||||||
private SchedulerLog slog = new SchedulerLog();
|
private SchedulerLog slog = new SchedulerLog();
|
||||||
|
|
||||||
private final CheckpointConfig checkpointConfig;
|
private final CheckpointConfig checkpointConfig;
|
||||||
|
|
@ -197,7 +197,7 @@ public class Scheduler implements Runnable {
|
||||||
initialize();
|
initialize();
|
||||||
log.info("Initialization complete. Starting worker loop.");
|
log.info("Initialization complete. Starting worker loop.");
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
log.error("Unable to initialize after {} attempts. Shutting down.", MAX_INITIALIZATION_ATTEMPTS, e);
|
log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e);
|
||||||
shutdown();
|
shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -214,7 +214,7 @@ public class Scheduler implements Runnable {
|
||||||
boolean isDone = false;
|
boolean isDone = false;
|
||||||
Exception lastException = null;
|
Exception lastException = null;
|
||||||
|
|
||||||
for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) {
|
for (int i = 0; (!isDone) && (i < lifecycleConfig.maxInitializationAttempts()); i++) {
|
||||||
try {
|
try {
|
||||||
log.info("Initialization attempt {}", (i + 1));
|
log.info("Initialization attempt {}", (i + 1));
|
||||||
log.info("Initializing LeaseCoordinator");
|
log.info("Initializing LeaseCoordinator");
|
||||||
|
|
|
||||||
|
|
@ -45,4 +45,9 @@ public class LifecycleConfig {
|
||||||
* AggregatorUtil is responsible for deaggregating KPL records.
|
* AggregatorUtil is responsible for deaggregating KPL records.
|
||||||
*/
|
*/
|
||||||
private AggregatorUtil aggregatorUtil = new AggregatorUtil();
|
private AggregatorUtil aggregatorUtil = new AggregatorUtil();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of attempts to initialize the Scheduler
|
||||||
|
*/
|
||||||
|
private int maxInitializationAttempts = 20;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -240,7 +240,21 @@ public class SchedulerTest {
|
||||||
|
|
||||||
scheduler.run();
|
scheduler.run();
|
||||||
|
|
||||||
verify(shardDetector, times(Scheduler.MAX_INITIALIZATION_ATTEMPTS)).listShards();
|
verify(shardDetector, times(lifecycleConfig.maxInitializationAttempts())).listShards();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception {
|
||||||
|
final int maxInitializationAttempts = 5;
|
||||||
|
lifecycleConfig.maxInitializationAttempts(maxInitializationAttempts);
|
||||||
|
|
||||||
|
doNothing().when(leaseCoordinator).initialize();
|
||||||
|
when(shardDetector.listShards()).thenThrow(new RuntimeException());
|
||||||
|
|
||||||
|
scheduler.run();
|
||||||
|
|
||||||
|
// verify initialization was retried for maxInitializationAttempts times
|
||||||
|
verify(shardDetector, times(maxInitializationAttempts)).listShards();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue