This commit is contained in:
Eric Meisel 2019-12-18 20:19:44 -06:00 committed by Cory-Bradshaw
parent 8a01abbf43
commit 3e32ff1906
4 changed files with 59 additions and 9 deletions

View file

@ -202,6 +202,11 @@ public class KinesisClientLibConfiguration {
*/
public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50;
/**
* The sleep time between initialization attempts for the scheduler.
*/
public static final long DEFAULT_SCHEDULER_INITIALIZATION_BACKOFF_TIME_MILLIS = 1000;
private String applicationName;
private String tableName;
private String streamName;
@ -262,6 +267,9 @@ public class KinesisClientLibConfiguration {
@Getter
private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS;
@Getter
private long schedulerInitializationBackoffTimeMillis = DEFAULT_SCHEDULER_INITIALIZATION_BACKOFF_TIME_MILLIS;
/**
* Constructor.
*
@ -308,7 +316,8 @@ public class KinesisClientLibConfiguration {
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, DEFAULT_SHUTDOWN_GRACE_MILLIS);
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, DEFAULT_SHUTDOWN_GRACE_MILLIS,
DEFAULT_SCHEDULER_INITIALIZATION_BACKOFF_TIME_MILLIS);
}
/**
@ -362,6 +371,8 @@ public class KinesisClientLibConfiguration {
* The region name for the service
* @param shutdownGraceMillis
* The number of milliseconds before graceful shutdown terminates forcefully
* @param schedulerInitializationBackoffTimeMillis
* Interval in milliseconds between retrying the scheduler initialization.
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@ -372,13 +383,13 @@ public class KinesisClientLibConfiguration {
boolean callProcessRecordsEvenForEmptyRecordList, long parentShardPollIntervalMillis,
long shardSyncIntervalMillis, boolean cleanupTerminatedShardsBeforeExpiry, long taskBackoffTimeMillis,
long metricsBufferTimeMillis, int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing,
String regionName, long shutdownGraceMillis) {
String regionName, long shutdownGraceMillis, long schedulerInitializationBackoffTimeMillis) {
this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords,
idleTimeBetweenReadsInMillis, callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis,
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, taskBackoffTimeMillis,
metricsBufferTimeMillis, metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName,
shutdownGraceMillis);
shutdownGraceMillis, schedulerInitializationBackoffTimeMillis);
}
/**
@ -432,6 +443,8 @@ public class KinesisClientLibConfiguration {
* {@link ShardRecordProcessorCheckpointer#checkpoint(String)}
* @param regionName
* The region name for the service
* @param schedulerInitializationBackoffTimeMillis
* Interval in milliseconds between retrying the scheduler initialization.
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@ -443,7 +456,7 @@ public class KinesisClientLibConfiguration {
long parentShardPollIntervalMillis, long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry, long taskBackoffTimeMillis, long metricsBufferTimeMillis,
int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName,
long shutdownGraceMillis) {
long shutdownGraceMillis, long schedulerInitializationBackoffTimeMillis) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -454,6 +467,7 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
checkIsValuePositive("schedulerInitializationBackoffTimeMillis", schedulerInitializationBackoffTimeMillis);
checkIsRegionNameValid(regionName);
this.applicationName = applicationName;
this.tableName = applicationName;
@ -485,6 +499,7 @@ public class KinesisClientLibConfiguration {
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory();
this.schedulerInitializationBackoffTimeMillis = schedulerInitializationBackoffTimeMillis;
}
/**
@ -538,6 +553,8 @@ public class KinesisClientLibConfiguration {
* {@link ShardRecordProcessorCheckpointer#checkpoint(String)}
* @param regionName
* The region name for the service
* @param schedulerInitializationBackoffTimeMillis
* Interval in milliseconds between retrying the scheduler initialization.
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@ -549,7 +566,7 @@ public class KinesisClientLibConfiguration {
long parentShardPollIntervalMillis, long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry, long taskBackoffTimeMillis, long metricsBufferTimeMillis,
int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName,
RecordsFetcherFactory recordsFetcherFactory) {
RecordsFetcherFactory recordsFetcherFactory, long schedulerInitializationBackoffTimeMillis) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -559,6 +576,7 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsValuePositive("schedulerInitializationBackoffTimeMillis", schedulerInitializationBackoffTimeMillis);
checkIsRegionNameValid(regionName);
this.applicationName = applicationName;
this.tableName = applicationName;
@ -594,6 +612,7 @@ public class KinesisClientLibConfiguration {
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = recordsFetcherFactory;
this.shutdownGraceMillis = shutdownGraceMillis;
this.schedulerInitializationBackoffTimeMillis = schedulerInitializationBackoffTimeMillis;
}
// Check if value is positive, otherwise throw an exception
@ -863,6 +882,13 @@ public class KinesisClientLibConfiguration {
return shutdownGraceMillis;
}
/**
* @return Interval in milliseconds between retrying the scheduler initialization.
*/
public long getSchedulerInitializationBackoffTimeMillis() {
return schedulerInitializationBackoffTimeMillis;
}
/*
* // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
* /**
@ -1381,4 +1407,15 @@ public class KinesisClientLibConfiguration {
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
return this;
}
/**
* @param schedulerInitializationBackoffTimeMillis
* Interval in milliseconds between retrying the scheduler initialization.
* @return
*/
public KinesisClientLibConfiguration withSchedulerInitializationBackoffTimeMillis(long schedulerInitializationBackoffTimeMillis) {
checkIsValuePositive("schedulerInitializationBackoffTimeMillis", schedulerInitializationBackoffTimeMillis);
this.schedulerInitializationBackoffTimeMillis = schedulerInitializationBackoffTimeMillis;
return this;
}
}

View file

@ -130,6 +130,8 @@ public class MultiLangDaemonConfiguration {
private ShardPrioritization shardPrioritization;
@ConfigurationSettable(configurationClass = CoordinatorConfig.class)
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ConfigurationSettable(configurationClass = CoordinatorConfig.class)
private long schedulerInitializationBackoffTimeMillis;
@ConfigurationSettable(configurationClass = LifecycleConfig.class)
private long taskBackoffTimeMillis;

View file

@ -90,4 +90,11 @@ public class CoordinatorConfig {
private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory();
/**
* Interval in milliseconds between retrying the scheduler initialization.
*
* <p>Default value: 1000L</p>
*/
private long schedulerInitializationBackoffTimeMillis = 1000L;
}

View file

@ -118,6 +118,7 @@ public class Scheduler implements Runnable {
private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final long schedulerInitializationBackoffTimeMillis;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@ -220,6 +221,7 @@ public class Scheduler implements Runnable {
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
}
/**
@ -291,10 +293,12 @@ public class Scheduler implements Runnable {
lastException = e;
}
try {
Thread.sleep(parentShardPollIntervalMillis);
} catch (InterruptedException e) {
log.debug("Sleep interrupted while initializing worker.");
if (!isDone) {
try {
Thread.sleep(schedulerInitializationBackoffTimeMillis);
} catch (InterruptedException e) {
log.debug("Sleep interrupted while initializing worker.");
}
}
}