From f2b8f677eb628af27a8d3562e335d8f9a101f55c Mon Sep 17 00:00:00 2001 From: Mike Watters Date: Mon, 6 Nov 2017 16:45:58 -0700 Subject: [PATCH] obtain `ignoreUnexpectedChildShards` from config where possible instead of adding the `ignoreUnexpectedChildShards` field to various objects and passing it as an explicit parameter, refrain from adding the field except where needed and obtain the value from the already-passed `KinesisClientLibConfiguration` parameter. --- .../lib/worker/ShardConsumer.java | 11 ++----- .../clientlibrary/lib/worker/Worker.java | 31 +++++++------------ .../lib/worker/ShardConsumerTest.java | 10 ------ .../worker/ShardSyncTaskIntegrationTest.java | 3 +- .../clientlibrary/lib/worker/WorkerTest.java | 20 ++---------- 5 files changed, 18 insertions(+), 57 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index f177260d..51a82124 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -55,7 +55,6 @@ class ShardConsumer { // Backoff time when polling to check if application has finished processing parent shards private final long parentShardPollIntervalMillis; private final boolean cleanupLeasesOfCompletedShards; - private final boolean ignoreUnexpectedChildShards; private final long taskBackoffTimeMillis; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -101,7 +100,7 @@ class ShardConsumer { * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard * @param backoffTimeMillis backoff interval when we encounter exceptions */ - // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 11 LINES + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES ShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, @@ -109,7 +108,6 @@ class ShardConsumer { ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards, ExecutorService executorService, IMetricsFactory metricsFactory, long backoffTimeMillis, @@ -122,7 +120,6 @@ class ShardConsumer { leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, executorService, metricsFactory, backoffTimeMillis, @@ -154,7 +151,6 @@ class ShardConsumer { ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards, ExecutorService executorService, IMetricsFactory metricsFactory, long backoffTimeMillis, @@ -178,7 +174,6 @@ class ShardConsumer { leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, executorService, metricsFactory, backoffTimeMillis, @@ -216,7 +211,6 @@ class ShardConsumer { ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards, ExecutorService executorService, IMetricsFactory metricsFactory, long backoffTimeMillis, @@ -233,7 +227,6 @@ class ShardConsumer { this.leaseManager = leaseManager; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; - this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.executorService = executorService; this.metricsFactory = metricsFactory; this.taskBackoffTimeMillis = backoffTimeMillis; @@ -492,7 +485,7 @@ class ShardConsumer { } boolean isIgnoreUnexpectedChildShards() { - return ignoreUnexpectedChildShards; + return config.shouldIgnoreUnexpectedChildShards(); } long getTaskBackoffTimeMillis() { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 71645c4a..c0f413be 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -104,7 +104,6 @@ public class Worker implements Runnable { // info, value is ShardConsumer. private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; - private final boolean ignoreUnexpectedChildShards; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -256,8 +255,7 @@ public class Worker implements Runnable { config.shouldValidateSequenceNumberBeforeCheckpointing(), config.getInitialPositionInStreamExtended()), config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(), - config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), - config.shouldIgnoreUnexpectedChildShards(), null, + config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, new KinesisClientLibLeaseCoordinator( new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), config.getWorkerIdentifier(), @@ -324,8 +322,6 @@ public class Worker implements Runnable { * Time between tasks to sync leases and Kinesis shards * @param cleanupLeasesUponShardCompletion * Clean up shards we've finished processing (don't wait till they expire in Kinesis) - * @param ignoreUnexpectedChildShards - * Ignore child shards with open parents * @param checkpoint * Used to get/set checkpoints * @param leaseCoordinator @@ -343,14 +339,14 @@ public class Worker implements Runnable { // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, - ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, - shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, checkpoint, - leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty()); + shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, + metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, + shardPrioritization, Optional.empty(), Optional.empty()); } /** @@ -372,8 +368,6 @@ public class Worker implements Runnable { * Time between tasks to sync leases and Kinesis shards * @param cleanupLeasesUponShardCompletion * Clean up shards we've finished processing (don't wait till they expire in Kinesis) - * @param ignoreUnexpectedChildShards - * Ignore child shards with open parents * @param checkpoint * Used to get/set checkpoints * @param leaseCoordinator @@ -395,8 +389,8 @@ public class Worker implements Runnable { // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, - ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { @@ -407,15 +401,14 @@ public class Worker implements Runnable { this.initialPosition = initialPositionInStream; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; - this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator; this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds(); this.executorService = execService; this.leaseCoordinator = leaseCoordinator; this.metricsFactory = metricsFactory; this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), - initialPositionInStream, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, - metricsFactory, executorService); + initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(), + shardSyncIdleTimeMillis, metricsFactory, executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.failoverTimeMillis = failoverTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -507,7 +500,7 @@ public class Worker implements Runnable { LOG.info("Syncing Kinesis shard info"); ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, 0L); + config.shouldIgnoreUnexpectedChildShards(), 0L); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); @@ -862,7 +855,6 @@ public class Worker implements Runnable { leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis, @@ -1281,7 +1273,6 @@ public class Worker implements Runnable { config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), - config.shouldIgnoreUnexpectedChildShards(), null, new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 8ae9dd03..9a7f2234 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -89,7 +89,6 @@ public class ShardConsumerTest { private final long taskBackoffTimeMillis = 500L; private final long parentShardPollIntervalMillis = 50L; private final boolean cleanupLeasesOfCompletedShards = true; - private final boolean ignoreUnexpectedChildShards = false; // We don't want any of these tests to run checkpoint validation private final boolean skipCheckpointValidationValue = false; private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = @@ -152,7 +151,6 @@ public class ShardConsumerTest { null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis, @@ -201,7 +199,6 @@ public class ShardConsumerTest { null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, spyExecutorService, metricsFactory, taskBackoffTimeMillis, @@ -244,7 +241,6 @@ public class ShardConsumerTest { null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis, @@ -358,7 +354,6 @@ public class ShardConsumerTest { leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis, @@ -488,7 +483,6 @@ public class ShardConsumerTest { leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis, @@ -562,7 +556,6 @@ public class ShardConsumerTest { null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis, @@ -614,7 +607,6 @@ public class ShardConsumerTest { null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis, @@ -645,7 +637,6 @@ public class ShardConsumerTest { null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, executorService, metricsFactory, taskBackoffTimeMillis, @@ -687,7 +678,6 @@ public class ShardConsumerTest { null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, mockExecutorService, metricsFactory, taskBackoffTimeMillis, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index 0f34fd9b..37a72741 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -123,7 +123,8 @@ public class ShardSyncTaskIntegrationTest { ShardSyncTask syncTask = new ShardSyncTask(kinesisProxy, leaseManager, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), - false, false, + false, + false, 0L); syncTask.call(); List leases = leaseManager.listLeases(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index d2277117..a8856a0b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -130,7 +130,6 @@ public class WorkerTest { private final long parentShardPollIntervalMillis = 5L; private final long shardSyncIntervalMillis = 5L; private final boolean cleanupLeasesUponShardCompletion = true; - private final boolean ignoreUnexpectedChildShards = false; // We don't want any of these tests to run checkpoint validation private final boolean skipCheckpointValidationValue = false; private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = @@ -256,7 +255,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, checkpoint, leaseCoordinator, execService, @@ -308,7 +306,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, checkpoint, leaseCoordinator, execService, @@ -377,7 +374,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, checkpoint, leaseCoordinator, execService, @@ -434,7 +430,6 @@ public class WorkerTest { shardPollInterval, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, execService, @@ -808,7 +803,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, executorService, @@ -889,8 +883,8 @@ public class WorkerTest { Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, - executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @Override void postConstruct() { this.gracefuleShutdownStarted = true; @@ -967,7 +961,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, executorService, @@ -1042,7 +1035,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, executorService, @@ -1125,7 +1117,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, executorService, @@ -1239,7 +1230,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, executorService, @@ -1357,7 +1347,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, executorService, @@ -1442,7 +1431,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, executorService, @@ -1488,7 +1476,7 @@ public class WorkerTest { KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, - boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, ICheckpoint checkpoint, + boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { @@ -1500,7 +1488,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, checkpoint, leaseCoordinator, execService, @@ -1823,7 +1810,6 @@ public class WorkerTest { parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator, executorService,