From 3e32ff190687e7a34274923d36f640d337298c35 Mon Sep 17 00:00:00 2001 From: Eric Meisel Date: Wed, 18 Dec 2019 20:19:44 -0600 Subject: [PATCH] Fix for #475 --- .../KinesisClientLibConfiguration.java | 47 +++++++++++++++++-- .../config/MultiLangDaemonConfiguration.java | 2 + .../coordinator/CoordinatorConfig.java | 7 +++ .../amazon/kinesis/coordinator/Scheduler.java | 12 +++-- 4 files changed, 59 insertions(+), 9 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java index 3ec7c653..54797050 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java @@ -202,6 +202,11 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50; + /** + * The sleep time between initialization attempts for the scheduler. + */ + public static final long DEFAULT_SCHEDULER_INITIALIZATION_BACKOFF_TIME_MILLIS = 1000; + private String applicationName; private String tableName; private String streamName; @@ -262,6 +267,9 @@ public class KinesisClientLibConfiguration { @Getter private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS; + @Getter + private long schedulerInitializationBackoffTimeMillis = DEFAULT_SCHEDULER_INITIALIZATION_BACKOFF_TIME_MILLIS; + /** * Constructor. * @@ -308,7 +316,8 @@ public class KinesisClientLibConfiguration { DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, - DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, DEFAULT_SHUTDOWN_GRACE_MILLIS); + DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, DEFAULT_SHUTDOWN_GRACE_MILLIS, + DEFAULT_SCHEDULER_INITIALIZATION_BACKOFF_TIME_MILLIS); } /** @@ -362,6 +371,8 @@ public class KinesisClientLibConfiguration { * The region name for the service * @param shutdownGraceMillis * The number of milliseconds before graceful shutdown terminates forcefully + * @param schedulerInitializationBackoffTimeMillis + * Interval in milliseconds between retrying the scheduler initialization. */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES @@ -372,13 +383,13 @@ public class KinesisClientLibConfiguration { boolean callProcessRecordsEvenForEmptyRecordList, long parentShardPollIntervalMillis, long shardSyncIntervalMillis, boolean cleanupTerminatedShardsBeforeExpiry, long taskBackoffTimeMillis, long metricsBufferTimeMillis, int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, - String regionName, long shutdownGraceMillis) { + String regionName, long shutdownGraceMillis, long schedulerInitializationBackoffTimeMillis) { this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName, - shutdownGraceMillis); + shutdownGraceMillis, schedulerInitializationBackoffTimeMillis); } /** @@ -432,6 +443,8 @@ public class KinesisClientLibConfiguration { * {@link ShardRecordProcessorCheckpointer#checkpoint(String)} * @param regionName * The region name for the service + * @param schedulerInitializationBackoffTimeMillis + * Interval in milliseconds between retrying the scheduler initialization. */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES @@ -443,7 +456,7 @@ public class KinesisClientLibConfiguration { long parentShardPollIntervalMillis, long shardSyncIntervalMillis, boolean cleanupTerminatedShardsBeforeExpiry, long taskBackoffTimeMillis, long metricsBufferTimeMillis, int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName, - long shutdownGraceMillis) { + long shutdownGraceMillis, long schedulerInitializationBackoffTimeMillis) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -454,6 +467,7 @@ public class KinesisClientLibConfiguration { checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); + checkIsValuePositive("schedulerInitializationBackoffTimeMillis", schedulerInitializationBackoffTimeMillis); checkIsRegionNameValid(regionName); this.applicationName = applicationName; this.tableName = applicationName; @@ -485,6 +499,7 @@ public class KinesisClientLibConfiguration { this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + this.schedulerInitializationBackoffTimeMillis = schedulerInitializationBackoffTimeMillis; } /** @@ -538,6 +553,8 @@ public class KinesisClientLibConfiguration { * {@link ShardRecordProcessorCheckpointer#checkpoint(String)} * @param regionName * The region name for the service + * @param schedulerInitializationBackoffTimeMillis + * Interval in milliseconds between retrying the scheduler initialization. */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES @@ -549,7 +566,7 @@ public class KinesisClientLibConfiguration { long parentShardPollIntervalMillis, long shardSyncIntervalMillis, boolean cleanupTerminatedShardsBeforeExpiry, long taskBackoffTimeMillis, long metricsBufferTimeMillis, int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName, - RecordsFetcherFactory recordsFetcherFactory) { + RecordsFetcherFactory recordsFetcherFactory, long schedulerInitializationBackoffTimeMillis) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -559,6 +576,7 @@ public class KinesisClientLibConfiguration { checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); + checkIsValuePositive("schedulerInitializationBackoffTimeMillis", schedulerInitializationBackoffTimeMillis); checkIsRegionNameValid(regionName); this.applicationName = applicationName; this.tableName = applicationName; @@ -594,6 +612,7 @@ public class KinesisClientLibConfiguration { this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = recordsFetcherFactory; this.shutdownGraceMillis = shutdownGraceMillis; + this.schedulerInitializationBackoffTimeMillis = schedulerInitializationBackoffTimeMillis; } // Check if value is positive, otherwise throw an exception @@ -863,6 +882,13 @@ public class KinesisClientLibConfiguration { return shutdownGraceMillis; } + /** + * @return Interval in milliseconds between retrying the scheduler initialization. + */ + public long getSchedulerInitializationBackoffTimeMillis() { + return schedulerInitializationBackoffTimeMillis; + } + /* * // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES * /** @@ -1381,4 +1407,15 @@ public class KinesisClientLibConfiguration { this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; return this; } + + /** + * @param schedulerInitializationBackoffTimeMillis + * Interval in milliseconds between retrying the scheduler initialization. + * @return + */ + public KinesisClientLibConfiguration withSchedulerInitializationBackoffTimeMillis(long schedulerInitializationBackoffTimeMillis) { + checkIsValuePositive("schedulerInitializationBackoffTimeMillis", schedulerInitializationBackoffTimeMillis); + this.schedulerInitializationBackoffTimeMillis = schedulerInitializationBackoffTimeMillis; + return this; + } } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index b20cf4d5..ed48ac27 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -130,6 +130,8 @@ public class MultiLangDaemonConfiguration { private ShardPrioritization shardPrioritization; @ConfigurationSettable(configurationClass = CoordinatorConfig.class) private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; + @ConfigurationSettable(configurationClass = CoordinatorConfig.class) + private long schedulerInitializationBackoffTimeMillis; @ConfigurationSettable(configurationClass = LifecycleConfig.class) private long taskBackoffTimeMillis; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index 6240eda9..d5c4dc13 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -90,4 +90,11 @@ public class CoordinatorConfig { private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory(); + /** + * Interval in milliseconds between retrying the scheduler initialization. + * + *

Default value: 1000L

+ */ + private long schedulerInitializationBackoffTimeMillis = 1000L; + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index fd8dfcb1..eaeb5a1c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -118,6 +118,7 @@ public class Scheduler implements Runnable { private final boolean ignoreUnexpetedChildShards; private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; + private final long schedulerInitializationBackoffTimeMillis; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -220,6 +221,7 @@ public class Scheduler implements Runnable { this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); + this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); } /** @@ -291,10 +293,12 @@ public class Scheduler implements Runnable { lastException = e; } - try { - Thread.sleep(parentShardPollIntervalMillis); - } catch (InterruptedException e) { - log.debug("Sleep interrupted while initializing worker."); + if (!isDone) { + try { + Thread.sleep(schedulerInitializationBackoffTimeMillis); + } catch (InterruptedException e) { + log.debug("Sleep interrupted while initializing worker."); + } } }