diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 62d87f30..1bfd0fc0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -172,6 +172,11 @@ public class KinesisClientLibConfiguration { */ public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); + /** + * The amount of milliseconds to wait before graceful shutdown forcefully terminates. + */ + public static final long DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000L; + /** * The size of the thread pool to create for the lease renewer to use. */ @@ -213,6 +218,7 @@ public class KinesisClientLibConfiguration { // This is useful for optimizing deployments to large fleets working on a stable stream. private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; + private long shutdownGraceMillis; @Getter private Optional timeoutInSeconds = Optional.empty(); @@ -268,7 +274,8 @@ public class KinesisClientLibConfiguration { DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, - DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null); + DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, + DEFAULT_SHUTDOWN_GRACE_MILLIS); } /** @@ -303,6 +310,7 @@ public class KinesisClientLibConfiguration { * with a call to Amazon Kinesis before checkpointing for calls to * {@link RecordProcessorCheckpointer#checkpoint(String)} * @param regionName The region name for the service + * @param shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES @@ -328,7 +336,8 @@ public class KinesisClientLibConfiguration { long metricsBufferTimeMillis, int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, - String regionName) { + String regionName, + long shutdownGraceMillis) { this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, @@ -336,7 +345,7 @@ public class KinesisClientLibConfiguration { shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, - validateSequenceNumberBeforeCheckpointing, regionName); + validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); } /** @@ -398,7 +407,8 @@ public class KinesisClientLibConfiguration { long metricsBufferTimeMillis, int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, - String regionName) { + String regionName, + long shutdownGraceMillis) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -408,6 +418,7 @@ public class KinesisClientLibConfiguration { checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); + checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); checkIsRegionNameValid(regionName); this.applicationName = applicationName; this.tableName = applicationName; @@ -444,6 +455,7 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.shutdownGraceMillis = shutdownGraceMillis; } // Check if value is positive, otherwise throw an exception @@ -731,6 +743,14 @@ public class KinesisClientLibConfiguration { return shardPrioritization; } + /** + * @return Graceful shutdown timeout + */ + public long getShutdownGraceMillis() { + return shutdownGraceMillis; + } + + /* // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** * @param tableName name of the lease table in DynamoDB @@ -1145,4 +1165,13 @@ public class KinesisClientLibConfiguration { this.timeoutInSeconds = Optional.of(timeoutInSeconds); } + /** + * @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withShutdownGraceMillis(long shutdownGraceMillis) { + checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); + this.shutdownGraceMillis = shutdownGraceMillis; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index 885eeb4f..82ca74c1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -148,13 +148,14 @@ public class MultiLangDaemon implements Callable { config.getRecordProcessorFactory(), executorService); + final long shutdownGraceMillis = config.getKinesisClientLibConfiguration().getShutdownGraceMillis(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { LOG.info("Process terminanted, will initiate shutdown."); try { Future fut = daemon.worker.requestShutdown(); - fut.get(5000, TimeUnit.MILLISECONDS); + fut.get(shutdownGraceMillis, TimeUnit.MILLISECONDS); LOG.info("Process shutdown is complete."); } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.error("Encountered an error during shutdown.", e); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index 4874a164..0092699f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -84,7 +84,8 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_LONG, TEST_VALUE_INT, skipCheckpointValidationValue, - null); + null, + TEST_VALUE_LONG); } @Test @@ -94,7 +95,7 @@ public class KinesisClientLibConfigurationTest { // Try each argument at one time. KinesisClientLibConfiguration config = null; long[] longValues = - { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; + { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; for (int i = 0; i < PARAMETER_COUNT; i++) { longValues[i] = INVALID_LONG; try { @@ -122,7 +123,8 @@ public class KinesisClientLibConfigurationTest { longValues[5], TEST_VALUE_INT, skipCheckpointValidationValue, - null); + null, + longValues[6]); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); } @@ -156,7 +158,8 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_LONG, intValues[1], skipCheckpointValidationValue, - null); + null, + TEST_VALUE_LONG); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); } @@ -319,7 +322,8 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_LONG, 1, skipCheckpointValidationValue, - "abcd"); + "abcd", + TEST_VALUE_LONG); Assert.fail("No expected Exception is thrown."); } catch(IllegalArgumentException e) { System.out.println(e.getMessage());