obtain ignoreUnexpectedChildShards from config where possible

instead of adding the `ignoreUnexpectedChildShards` field to various
objects and passing it as an explicit parameter, refrain from adding
the field except where needed and obtain the value from the
already-passed `KinesisClientLibConfiguration` parameter.
This commit is contained in:
Mike Watters 2017-11-06 16:45:58 -07:00
parent 8358322835
commit f2b8f677eb
5 changed files with 18 additions and 57 deletions

View file

@ -55,7 +55,6 @@ class ShardConsumer {
// Backoff time when polling to check if application has finished processing parent shards
private final long parentShardPollIntervalMillis;
private final boolean cleanupLeasesOfCompletedShards;
private final boolean ignoreUnexpectedChildShards;
private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -101,7 +100,7 @@ class ShardConsumer {
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 11 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
@ -109,7 +108,6 @@ class ShardConsumer {
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
@ -122,7 +120,6 @@ class ShardConsumer {
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
backoffTimeMillis,
@ -154,7 +151,6 @@ class ShardConsumer {
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
@ -178,7 +174,6 @@ class ShardConsumer {
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
backoffTimeMillis,
@ -216,7 +211,6 @@ class ShardConsumer {
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
@ -233,7 +227,6 @@ class ShardConsumer {
this.leaseManager = leaseManager;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.executorService = executorService;
this.metricsFactory = metricsFactory;
this.taskBackoffTimeMillis = backoffTimeMillis;
@ -492,7 +485,7 @@ class ShardConsumer {
}
boolean isIgnoreUnexpectedChildShards() {
return ignoreUnexpectedChildShards;
return config.shouldIgnoreUnexpectedChildShards();
}
long getTaskBackoffTimeMillis() {

View file

@ -104,7 +104,6 @@ public class Worker implements Runnable {
// info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion;
private final boolean ignoreUnexpectedChildShards;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -256,8 +255,7 @@ public class Worker implements Runnable {
config.shouldValidateSequenceNumberBeforeCheckpointing(),
config.getInitialPositionInStreamExtended()),
config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(),
config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(), null,
config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
config.getWorkerIdentifier(),
@ -324,8 +322,6 @@ public class Worker implements Runnable {
* Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
* @param ignoreUnexpectedChildShards
* Ignore child shards with open parents
* @param checkpoint
* Used to get/set checkpoints
* @param leaseCoordinator
@ -343,14 +339,14 @@ public class Worker implements Runnable {
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards,
ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, checkpoint,
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty());
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty());
}
/**
@ -372,8 +368,6 @@ public class Worker implements Runnable {
* Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
* @param ignoreUnexpectedChildShards
* Ignore child shards with open parents
* @param checkpoint
* Used to get/set checkpoints
* @param leaseCoordinator
@ -395,8 +389,8 @@ public class Worker implements Runnable {
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards,
ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
@ -407,15 +401,14 @@ public class Worker implements Runnable {
this.initialPosition = initialPositionInStream;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
this.executorService = execService;
this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory;
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
initialPositionInStream, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
metricsFactory, executorService);
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
shardSyncIdleTimeMillis, metricsFactory, executorService);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -507,7 +500,7 @@ public class Worker implements Runnable {
LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, 0L);
config.shouldIgnoreUnexpectedChildShards(), 0L);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
@ -862,7 +855,6 @@ public class Worker implements Runnable {
leaseCoordinator.getLeaseManager(),
parentShardPollIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@ -1281,7 +1273,6 @@ public class Worker implements Runnable {
config.getParentShardPollIntervalMillis(),
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(),
null,
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(),
dynamoDBClient),

View file

@ -89,7 +89,6 @@ public class ShardConsumerTest {
private final long taskBackoffTimeMillis = 500L;
private final long parentShardPollIntervalMillis = 50L;
private final boolean cleanupLeasesOfCompletedShards = true;
private final boolean ignoreUnexpectedChildShards = false;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
@ -152,7 +151,6 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@ -201,7 +199,6 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
spyExecutorService,
metricsFactory,
taskBackoffTimeMillis,
@ -244,7 +241,6 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@ -358,7 +354,6 @@ public class ShardConsumerTest {
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@ -488,7 +483,6 @@ public class ShardConsumerTest {
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@ -562,7 +556,6 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@ -614,7 +607,6 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@ -645,7 +637,6 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
@ -687,7 +678,6 @@ public class ShardConsumerTest {
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
mockExecutorService,
metricsFactory,
taskBackoffTimeMillis,

View file

@ -123,7 +123,8 @@ public class ShardSyncTaskIntegrationTest {
ShardSyncTask syncTask = new ShardSyncTask(kinesisProxy,
leaseManager,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
false, false,
false,
false,
0L);
syncTask.call();
List<KinesisClientLease> leases = leaseManager.listLeases();

View file

@ -130,7 +130,6 @@ public class WorkerTest {
private final long parentShardPollIntervalMillis = 5L;
private final long shardSyncIntervalMillis = 5L;
private final boolean cleanupLeasesUponShardCompletion = true;
private final boolean ignoreUnexpectedChildShards = false;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
@ -256,7 +255,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
checkpoint,
leaseCoordinator,
execService,
@ -308,7 +306,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
checkpoint,
leaseCoordinator,
execService,
@ -377,7 +374,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
checkpoint,
leaseCoordinator,
execService,
@ -434,7 +430,6 @@ public class WorkerTest {
shardPollInterval,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
execService,
@ -808,7 +803,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
executorService,
@ -889,8 +883,8 @@ public class WorkerTest {
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, leaseCoordinator, leaseCoordinator,
executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@Override
void postConstruct() {
this.gracefuleShutdownStarted = true;
@ -967,7 +961,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
executorService,
@ -1042,7 +1035,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
executorService,
@ -1125,7 +1117,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
executorService,
@ -1239,7 +1230,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
executorService,
@ -1357,7 +1347,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
executorService,
@ -1442,7 +1431,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
executorService,
@ -1488,7 +1476,7 @@ public class WorkerTest {
KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, ICheckpoint checkpoint,
boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
@ -1500,7 +1488,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIdleTimeMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
checkpoint,
leaseCoordinator,
execService,
@ -1823,7 +1810,6 @@ public class WorkerTest {
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
leaseCoordinator,
leaseCoordinator,
executorService,