set shutdownGraceMillis in KinesisClientLibConfiguration constructor

This commit is contained in:
ytakahashi 2019-08-22 13:01:34 +09:00
parent 3e36f0c7d0
commit 337d4e26a6
2 changed files with 56 additions and 0 deletions

View file

@ -486,6 +486,7 @@ public class KinesisClientLibConfiguration {
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.metricsBufferTimeMillis = metricsBufferTimeMillis;
this.metricsMaxQueueSize = metricsMaxQueueSize;
this.shutdownGraceMillis = shutdownGraceMillis;
this.metricsLevel = DEFAULT_METRICS_LEVEL;
this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;

View file

@ -14,6 +14,19 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_IDLETIME_BETWEEN_READS_MILLIS;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_INITIAL_POSITION_IN_STREAM;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_TASK_BACKOFF_TIME_MILLIS;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -60,6 +73,27 @@ public class KinesisClientLibConfigurationTest {
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(TEST_STRING, TEST_STRING, null, TEST_STRING);
// assert that default parameters are set
assertEquals(TEST_STRING, config.getApplicationName());
assertEquals(TEST_STRING, config.getStreamName());
assertNull(config.getKinesisEndpoint());
assertNull(config.getDynamoDBEndpoint());
assertEquals(DEFAULT_INITIAL_POSITION_IN_STREAM, config.getInitialPositionInStream());
assertEquals(DEFAULT_FAILOVER_TIME_MILLIS, config.getFailoverTimeMillis());
assertEquals(TEST_STRING, config.getWorkerIdentifier());
assertEquals(DEFAULT_MAX_RECORDS, config.getMaxRecords());
assertEquals(DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, config.getIdleTimeBetweenReadsInMillis());
assertEquals(DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, config.shouldCallProcessRecordsEvenForEmptyRecordList());
assertEquals(DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, config.getParentShardPollIntervalMillis());
assertEquals(DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, config.getShardSyncIntervalMillis());
assertEquals(DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, config.shouldCleanupLeasesUponShardCompletion());
assertEquals(DEFAULT_TASK_BACKOFF_TIME_MILLIS, config.getTaskBackoffTimeMillis());
assertEquals(DEFAULT_METRICS_BUFFER_TIME_MILLIS, config.getMetricsBufferTimeMillis());
assertEquals(DEFAULT_METRICS_MAX_QUEUE_SIZE, config.getMetricsMaxQueueSize());
assertEquals(DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, config.shouldValidateSequenceNumberBeforeCheckpointing());
assertNull(config.getRegionName());
assertEquals(DEFAULT_SHUTDOWN_GRACE_MILLIS, config.getShutdownGraceMillis());
// Test constructor with all valid arguments.
config =
new KinesisClientLibConfiguration(TEST_STRING,
@ -87,6 +121,27 @@ public class KinesisClientLibConfigurationTest {
skipCheckpointValidationValue,
null,
TEST_VALUE_LONG);
// assert that expected parameters are set
assertEquals(TEST_STRING, config.getApplicationName());
assertEquals(TEST_STRING, config.getStreamName());
assertEquals(TEST_STRING, config.getKinesisEndpoint());
assertEquals(TEST_STRING, config.getDynamoDBEndpoint());
assertEquals(InitialPositionInStream.LATEST, config.getInitialPositionInStream());
assertEquals(TEST_VALUE_LONG, config.getFailoverTimeMillis());
assertEquals(TEST_STRING, config.getWorkerIdentifier());
assertEquals(TEST_VALUE_INT, config.getMaxRecords());
assertEquals(TEST_VALUE_LONG, config.getIdleTimeBetweenReadsInMillis());
assertFalse(config.shouldCallProcessRecordsEvenForEmptyRecordList());
assertEquals(TEST_VALUE_LONG, config.getParentShardPollIntervalMillis());
assertEquals(TEST_VALUE_LONG, config.getShardSyncIntervalMillis());
assertTrue(config.shouldCleanupLeasesUponShardCompletion());
assertEquals(TEST_VALUE_LONG, config.getTaskBackoffTimeMillis());
assertEquals(TEST_VALUE_LONG, config.getMetricsBufferTimeMillis());
assertEquals(TEST_VALUE_INT, config.getMetricsMaxQueueSize());
assertEquals(skipCheckpointValidationValue, config.shouldValidateSequenceNumberBeforeCheckpointing());
assertNull(config.getRegionName());
assertEquals(TEST_VALUE_LONG, config.getShutdownGraceMillis());
}
@Test