Merge branch 'master' into release-1.8.2

This commit is contained in:
Pfifer, Justin 2017-09-20 07:32:10 -07:00
commit 8f79833463
3 changed files with 44 additions and 10 deletions

View file

@ -172,6 +172,11 @@ public class KinesisClientLibConfiguration {
*/
public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization();
/**
* The amount of milliseconds to wait before graceful shutdown forcefully terminates.
*/
public static final long DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000L;
/**
* The size of the thread pool to create for the lease renewer to use.
*/
@ -213,6 +218,7 @@ public class KinesisClientLibConfiguration {
// This is useful for optimizing deployments to large fleets working on a stable stream.
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization;
private long shutdownGraceMillis;
@Getter
private Optional<Integer> timeoutInSeconds = Optional.empty();
@ -268,7 +274,8 @@ public class KinesisClientLibConfiguration {
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null,
DEFAULT_SHUTDOWN_GRACE_MILLIS);
}
/**
@ -303,6 +310,7 @@ public class KinesisClientLibConfiguration {
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
* @param shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@ -328,7 +336,8 @@ public class KinesisClientLibConfiguration {
long metricsBufferTimeMillis,
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName) {
String regionName,
long shutdownGraceMillis) {
this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis,
@ -336,7 +345,7 @@ public class KinesisClientLibConfiguration {
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName);
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis);
}
/**
@ -398,7 +407,8 @@ public class KinesisClientLibConfiguration {
long metricsBufferTimeMillis,
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName) {
String regionName,
long shutdownGraceMillis) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -408,6 +418,7 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
checkIsRegionNameValid(regionName);
this.applicationName = applicationName;
this.tableName = applicationName;
@ -444,6 +455,7 @@ public class KinesisClientLibConfiguration {
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.shutdownGraceMillis = shutdownGraceMillis;
}
// Check if value is positive, otherwise throw an exception
@ -731,6 +743,14 @@ public class KinesisClientLibConfiguration {
return shardPrioritization;
}
/**
* @return Graceful shutdown timeout
*/
public long getShutdownGraceMillis() {
return shutdownGraceMillis;
}
/*
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/**
* @param tableName name of the lease table in DynamoDB
@ -1145,4 +1165,13 @@ public class KinesisClientLibConfiguration {
this.timeoutInSeconds = Optional.of(timeoutInSeconds);
}
/**
* @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withShutdownGraceMillis(long shutdownGraceMillis) {
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
this.shutdownGraceMillis = shutdownGraceMillis;
return this;
}
}

View file

@ -148,13 +148,14 @@ public class MultiLangDaemon implements Callable<Integer> {
config.getRecordProcessorFactory(),
executorService);
final long shutdownGraceMillis = config.getKinesisClientLibConfiguration().getShutdownGraceMillis();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOG.info("Process terminanted, will initiate shutdown.");
try {
Future<Void> fut = daemon.worker.requestShutdown();
fut.get(5000, TimeUnit.MILLISECONDS);
fut.get(shutdownGraceMillis, TimeUnit.MILLISECONDS);
LOG.info("Process shutdown is complete.");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Encountered an error during shutdown.", e);

View file

@ -84,7 +84,8 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_LONG,
TEST_VALUE_INT,
skipCheckpointValidationValue,
null);
null,
TEST_VALUE_LONG);
}
@Test
@ -94,7 +95,7 @@ public class KinesisClientLibConfigurationTest {
// Try each argument at one time.
KinesisClientLibConfiguration config = null;
long[] longValues =
{ TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG };
{ TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG };
for (int i = 0; i < PARAMETER_COUNT; i++) {
longValues[i] = INVALID_LONG;
try {
@ -122,7 +123,8 @@ public class KinesisClientLibConfigurationTest {
longValues[5],
TEST_VALUE_INT,
skipCheckpointValidationValue,
null);
null,
longValues[6]);
} catch (IllegalArgumentException e) {
System.out.println(e.getMessage());
}
@ -156,7 +158,8 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_LONG,
intValues[1],
skipCheckpointValidationValue,
null);
null,
TEST_VALUE_LONG);
} catch (IllegalArgumentException e) {
System.out.println(e.getMessage());
}
@ -319,7 +322,8 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_LONG,
1,
skipCheckpointValidationValue,
"abcd");
"abcd",
TEST_VALUE_LONG);
Assert.fail("No expected Exception is thrown.");
} catch(IllegalArgumentException e) {
System.out.println(e.getMessage());