From f5cbd84c875fec23bd275b17df3dbf69f872e3b7 Mon Sep 17 00:00:00 2001 From: Alex Charlton Date: Wed, 16 Aug 2017 15:50:56 -0700 Subject: [PATCH] MultiLangDaemon: Make shutdown grace configurable --- .../worker/KinesisClientLibConfiguration.java | 37 +++++++++++++++++-- .../kinesis/multilang/MultiLangDaemon.java | 3 +- .../KinesisClientLibConfigurationTest.java | 14 ++++--- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index e9673414..45263e28 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -172,6 +172,11 @@ public class KinesisClientLibConfiguration { */ public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); + /** + * The amount of milliseconds to wait before graceful shutdown forcefully terminates. + */ + public static final long DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000L; + /** * The size of the thread pool to create for the lease renewer to use. */ @@ -213,6 +218,7 @@ public class KinesisClientLibConfiguration { // This is useful for optimizing deployments to large fleets working on a stable stream. private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; + private long shutdownGraceMillis; @Getter private Optional timeoutInSeconds = Optional.empty(); @@ -262,7 +268,8 @@ public class KinesisClientLibConfiguration { DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, - DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null); + DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, + DEFAULT_SHUTDOWN_GRACE_MILLIS); } /** @@ -297,6 +304,7 @@ public class KinesisClientLibConfiguration { * with a call to Amazon Kinesis before checkpointing for calls to * {@link RecordProcessorCheckpointer#checkpoint(String)} * @param regionName The region name for the service + * @param shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES @@ -322,7 +330,8 @@ public class KinesisClientLibConfiguration { long metricsBufferTimeMillis, int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, - String regionName) { + String regionName, + long shutdownGraceMillis) { this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, @@ -330,7 +339,7 @@ public class KinesisClientLibConfiguration { shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, - validateSequenceNumberBeforeCheckpointing, regionName); + validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); } /** @@ -392,7 +401,8 @@ public class KinesisClientLibConfiguration { long metricsBufferTimeMillis, int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, - String regionName) { + String regionName, + long shutdownGraceMillis) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -402,6 +412,7 @@ public class KinesisClientLibConfiguration { checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); + checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); checkIsRegionNameValid(regionName); this.applicationName = applicationName; this.tableName = applicationName; @@ -438,6 +449,7 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.shutdownGraceMillis = shutdownGraceMillis; } // Check if value is positive, otherwise throw an exception @@ -725,6 +737,14 @@ public class KinesisClientLibConfiguration { return shardPrioritization; } + /** + * @return Graceful shutdown timeout + */ + public long getShutdownGraceMillis() { + return shutdownGraceMillis; + } + + /* // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** * @param tableName name of the lease table in DynamoDB @@ -1118,4 +1138,13 @@ public class KinesisClientLibConfiguration { this.timeoutInSeconds = Optional.of(timeoutInSeconds); } + /** + * @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withShutdownGraceMillis(long shutdownGraceMillis) { + checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); + this.shutdownGraceMillis = shutdownGraceMillis; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index 885eeb4f..82ca74c1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -148,13 +148,14 @@ public class MultiLangDaemon implements Callable { config.getRecordProcessorFactory(), executorService); + final long shutdownGraceMillis = config.getKinesisClientLibConfiguration().getShutdownGraceMillis(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { LOG.info("Process terminanted, will initiate shutdown."); try { Future fut = daemon.worker.requestShutdown(); - fut.get(5000, TimeUnit.MILLISECONDS); + fut.get(shutdownGraceMillis, TimeUnit.MILLISECONDS); LOG.info("Process shutdown is complete."); } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.error("Encountered an error during shutdown.", e); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index 4874a164..0092699f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -84,7 +84,8 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_LONG, TEST_VALUE_INT, skipCheckpointValidationValue, - null); + null, + TEST_VALUE_LONG); } @Test @@ -94,7 +95,7 @@ public class KinesisClientLibConfigurationTest { // Try each argument at one time. KinesisClientLibConfiguration config = null; long[] longValues = - { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; + { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; for (int i = 0; i < PARAMETER_COUNT; i++) { longValues[i] = INVALID_LONG; try { @@ -122,7 +123,8 @@ public class KinesisClientLibConfigurationTest { longValues[5], TEST_VALUE_INT, skipCheckpointValidationValue, - null); + null, + longValues[6]); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); } @@ -156,7 +158,8 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_LONG, intValues[1], skipCheckpointValidationValue, - null); + null, + TEST_VALUE_LONG); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); } @@ -319,7 +322,8 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_LONG, 1, skipCheckpointValidationValue, - "abcd"); + "abcd", + TEST_VALUE_LONG); Assert.fail("No expected Exception is thrown."); } catch(IllegalArgumentException e) { System.out.println(e.getMessage());