Moving the max number of Scheduler initialization attempts parameter … (#368)
* Moving the max number of Scheduler initialization attempts parameter to CoordinatorConfig * Changing the max initialization attempts variable name
This commit is contained in:
parent
205cf051f3
commit
e694ab7724
4 changed files with 15 additions and 8 deletions
|
|
@ -35,6 +35,13 @@ public class CoordinatorConfig {
|
||||||
@NonNull
|
@NonNull
|
||||||
private final String applicationName;
|
private final String applicationName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of attempts to initialize the Scheduler
|
||||||
|
*
|
||||||
|
* <p>Default value: 20</p>
|
||||||
|
*/
|
||||||
|
private int maxInitializationAttempts = 20;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interval in milliseconds between polling to check for parent shard completion.
|
* Interval in milliseconds between polling to check for parent shard completion.
|
||||||
* Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
|
* Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,7 @@ public class Scheduler implements Runnable {
|
||||||
private final RetrievalConfig retrievalConfig;
|
private final RetrievalConfig retrievalConfig;
|
||||||
|
|
||||||
private final String applicationName;
|
private final String applicationName;
|
||||||
|
private final int maxInitializationAttempts;
|
||||||
private final Checkpointer checkpoint;
|
private final Checkpointer checkpoint;
|
||||||
private final long shardConsumerDispatchPollIntervalMillis;
|
private final long shardConsumerDispatchPollIntervalMillis;
|
||||||
// Backoff time when polling to check if application has finished processing
|
// Backoff time when polling to check if application has finished processing
|
||||||
|
|
@ -144,6 +145,7 @@ public class Scheduler implements Runnable {
|
||||||
this.retrievalConfig = retrievalConfig;
|
this.retrievalConfig = retrievalConfig;
|
||||||
|
|
||||||
this.applicationName = this.coordinatorConfig.applicationName();
|
this.applicationName = this.coordinatorConfig.applicationName();
|
||||||
|
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
|
||||||
this.metricsFactory = this.metricsConfig.metricsFactory();
|
this.metricsFactory = this.metricsConfig.metricsFactory();
|
||||||
this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory()
|
this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory()
|
||||||
.createLeaseCoordinator(this.metricsFactory);
|
.createLeaseCoordinator(this.metricsFactory);
|
||||||
|
|
@ -197,7 +199,7 @@ public class Scheduler implements Runnable {
|
||||||
initialize();
|
initialize();
|
||||||
log.info("Initialization complete. Starting worker loop.");
|
log.info("Initialization complete. Starting worker loop.");
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e);
|
log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e);
|
||||||
shutdown();
|
shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -214,7 +216,7 @@ public class Scheduler implements Runnable {
|
||||||
boolean isDone = false;
|
boolean isDone = false;
|
||||||
Exception lastException = null;
|
Exception lastException = null;
|
||||||
|
|
||||||
for (int i = 0; (!isDone) && (i < lifecycleConfig.maxInitializationAttempts()); i++) {
|
for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) {
|
||||||
try {
|
try {
|
||||||
log.info("Initialization attempt {}", (i + 1));
|
log.info("Initialization attempt {}", (i + 1));
|
||||||
log.info("Initializing LeaseCoordinator");
|
log.info("Initializing LeaseCoordinator");
|
||||||
|
|
|
||||||
|
|
@ -46,8 +46,4 @@ public class LifecycleConfig {
|
||||||
*/
|
*/
|
||||||
private AggregatorUtil aggregatorUtil = new AggregatorUtil();
|
private AggregatorUtil aggregatorUtil = new AggregatorUtil();
|
||||||
|
|
||||||
/**
|
|
||||||
* The maximum number of attempts to initialize the Scheduler
|
|
||||||
*/
|
|
||||||
private int maxInitializationAttempts = 20;
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -240,13 +240,15 @@ public class SchedulerTest {
|
||||||
|
|
||||||
scheduler.run();
|
scheduler.run();
|
||||||
|
|
||||||
verify(shardDetector, times(lifecycleConfig.maxInitializationAttempts())).listShards();
|
verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception {
|
public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception {
|
||||||
final int maxInitializationAttempts = 5;
|
final int maxInitializationAttempts = 5;
|
||||||
lifecycleConfig.maxInitializationAttempts(maxInitializationAttempts);
|
coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts);
|
||||||
|
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||||
|
metricsConfig, processorConfig, retrievalConfig);
|
||||||
|
|
||||||
doNothing().when(leaseCoordinator).initialize();
|
doNothing().when(leaseCoordinator).initialize();
|
||||||
when(shardDetector.listShards()).thenThrow(new RuntimeException());
|
when(shardDetector.listShards()).thenThrow(new RuntimeException());
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue