MultiLangDaemon: Make shutdown grace configurable (#204)

Allow configuring the amount of time that the graceful shutdown process will wait for the client to complete its shutdown.
This commit is contained in:
Alex Charlton 2017-09-19 08:52:31 -07:00 committed by Justin Pfifer
parent 244da44d29
commit 01d2688bc6
3 changed files with 44 additions and 10 deletions

View file

@ -172,6 +172,11 @@ public class KinesisClientLibConfiguration {
*/ */
public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization();
/**
* The amount of milliseconds to wait before graceful shutdown forcefully terminates.
*/
public static final long DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000L;
/** /**
* The size of the thread pool to create for the lease renewer to use. * The size of the thread pool to create for the lease renewer to use.
*/ */
@ -213,6 +218,7 @@ public class KinesisClientLibConfiguration {
// This is useful for optimizing deployments to large fleets working on a stable stream. // This is useful for optimizing deployments to large fleets working on a stable stream.
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization; private ShardPrioritization shardPrioritization;
private long shutdownGraceMillis;
@Getter @Getter
private Optional<Integer> timeoutInSeconds = Optional.empty(); private Optional<Integer> timeoutInSeconds = Optional.empty();
@ -268,7 +274,8 @@ public class KinesisClientLibConfiguration {
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null); DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null,
DEFAULT_SHUTDOWN_GRACE_MILLIS);
} }
/** /**
@ -303,6 +310,7 @@ public class KinesisClientLibConfiguration {
* with a call to Amazon Kinesis before checkpointing for calls to * with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)} * {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service * @param regionName The region name for the service
* @param shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully
*/ */
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@ -328,7 +336,8 @@ public class KinesisClientLibConfiguration {
long metricsBufferTimeMillis, long metricsBufferTimeMillis,
int metricsMaxQueueSize, int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
String regionName) { String regionName,
long shutdownGraceMillis) {
this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis, maxRecords, idleTimeBetweenReadsInMillis,
@ -336,7 +345,7 @@ public class KinesisClientLibConfiguration {
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName); validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis);
} }
/** /**
@ -398,7 +407,8 @@ public class KinesisClientLibConfiguration {
long metricsBufferTimeMillis, long metricsBufferTimeMillis,
int metricsMaxQueueSize, int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
String regionName) { String regionName,
long shutdownGraceMillis) {
// Check following values are greater than zero // Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -408,6 +418,7 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
checkIsRegionNameValid(regionName); checkIsRegionNameValid(regionName);
this.applicationName = applicationName; this.applicationName = applicationName;
this.tableName = applicationName; this.tableName = applicationName;
@ -444,6 +455,7 @@ public class KinesisClientLibConfiguration {
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.shutdownGraceMillis = shutdownGraceMillis;
} }
// Check if value is positive, otherwise throw an exception // Check if value is positive, otherwise throw an exception
@ -731,6 +743,14 @@ public class KinesisClientLibConfiguration {
return shardPrioritization; return shardPrioritization;
} }
/**
* @return Graceful shutdown timeout
*/
public long getShutdownGraceMillis() {
return shutdownGraceMillis;
}
/*
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/** /**
* @param tableName name of the lease table in DynamoDB * @param tableName name of the lease table in DynamoDB
@ -1145,4 +1165,13 @@ public class KinesisClientLibConfiguration {
this.timeoutInSeconds = Optional.of(timeoutInSeconds); this.timeoutInSeconds = Optional.of(timeoutInSeconds);
} }
/**
* @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withShutdownGraceMillis(long shutdownGraceMillis) {
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
this.shutdownGraceMillis = shutdownGraceMillis;
return this;
}
} }

View file

@ -148,13 +148,14 @@ public class MultiLangDaemon implements Callable<Integer> {
config.getRecordProcessorFactory(), config.getRecordProcessorFactory(),
executorService); executorService);
final long shutdownGraceMillis = config.getKinesisClientLibConfiguration().getShutdownGraceMillis();
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@Override @Override
public void run() { public void run() {
LOG.info("Process terminanted, will initiate shutdown."); LOG.info("Process terminanted, will initiate shutdown.");
try { try {
Future<Void> fut = daemon.worker.requestShutdown(); Future<Void> fut = daemon.worker.requestShutdown();
fut.get(5000, TimeUnit.MILLISECONDS); fut.get(shutdownGraceMillis, TimeUnit.MILLISECONDS);
LOG.info("Process shutdown is complete."); LOG.info("Process shutdown is complete.");
} catch (InterruptedException | ExecutionException | TimeoutException e) { } catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Encountered an error during shutdown.", e); LOG.error("Encountered an error during shutdown.", e);

View file

@ -84,7 +84,8 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_LONG, TEST_VALUE_LONG,
TEST_VALUE_INT, TEST_VALUE_INT,
skipCheckpointValidationValue, skipCheckpointValidationValue,
null); null,
TEST_VALUE_LONG);
} }
@Test @Test
@ -94,7 +95,7 @@ public class KinesisClientLibConfigurationTest {
// Try each argument at one time. // Try each argument at one time.
KinesisClientLibConfiguration config = null; KinesisClientLibConfiguration config = null;
long[] longValues = long[] longValues =
{ TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG };
for (int i = 0; i < PARAMETER_COUNT; i++) { for (int i = 0; i < PARAMETER_COUNT; i++) {
longValues[i] = INVALID_LONG; longValues[i] = INVALID_LONG;
try { try {
@ -122,7 +123,8 @@ public class KinesisClientLibConfigurationTest {
longValues[5], longValues[5],
TEST_VALUE_INT, TEST_VALUE_INT,
skipCheckpointValidationValue, skipCheckpointValidationValue,
null); null,
longValues[6]);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());
} }
@ -156,7 +158,8 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_LONG, TEST_VALUE_LONG,
intValues[1], intValues[1],
skipCheckpointValidationValue, skipCheckpointValidationValue,
null); null,
TEST_VALUE_LONG);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());
} }
@ -319,7 +322,8 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_LONG, TEST_VALUE_LONG,
1, 1,
skipCheckpointValidationValue, skipCheckpointValidationValue,
"abcd"); "abcd",
TEST_VALUE_LONG);
Assert.fail("No expected Exception is thrown."); Assert.fail("No expected Exception is thrown.");
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());