From f2911f1f0d44ec0fa90bceb43523d066d0317cc8 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Mon, 30 Mar 2020 18:19:32 -0400 Subject: [PATCH] Moving literals to variable for readability --- .../amazon/kinesis/leases/HierarchicalShardSyncer.java | 5 ++--- .../amazon/kinesis/leases/ShardSyncTaskManager.java | 7 +++++-- .../amazon/kinesis/leases/exceptions/ShardSyncer.java | 3 ++- .../software/amazon/kinesis/lifecycle/ShutdownTask.java | 6 ++++-- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index ae017e19..8d280d12 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -107,15 +107,14 @@ public class HierarchicalShardSyncer { final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, garbageCollectLeases, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, isLeaseTableEmpty, latestShards); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards, isLeaseTableEmpty); } //Provide a pre-collcted list of shards to avoid calling ListShards API public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty, - List latestShards) + final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index 3933b70a..a52ac650 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -48,6 +48,7 @@ public class ShardSyncTaskManager { @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesUponShardCompletion; + private final boolean garbageCollectLeases; private final boolean ignoreUnexpectedChildShards; private final long shardSyncIdleTimeMillis; @NonNull @@ -84,6 +85,7 @@ public class ShardSyncTaskManager { this.leaseRefresher = leaseRefresher; this.initialPositionInStream = initialPositionInStream; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.garbageCollectLeases = true; this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; this.executorService = executorService; @@ -114,6 +116,7 @@ public class ShardSyncTaskManager { this.leaseRefresher = leaseRefresher; this.initialPositionInStream = initialPositionInStream; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.garbageCollectLeases = true; this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; this.executorService = executorService; @@ -128,7 +131,7 @@ public class ShardSyncTaskManager { leaseRefresher, initialPositionInStream, cleanupLeasesUponShardCompletion, - true, + garbageCollectLeases, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, hierarchicalShardSyncer, @@ -167,7 +170,7 @@ public class ShardSyncTaskManager { leaseRefresher, initialPositionInStream, cleanupLeasesUponShardCompletion, - true, + garbageCollectLeases, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, hierarchicalShardSyncer, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java index 683b29b3..14cd40c3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -19,6 +19,7 @@ import software.amazon.kinesis.metrics.MetricsScope; @Deprecated public class ShardSyncer { private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer(); + private static final boolean garbageCollectLeases = true; /** *

NOTE: This method is deprecated and will be removed in a future release.

@@ -41,6 +42,6 @@ public class ShardSyncer { final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - true, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, leaseRefresher.isLeaseTableEmpty()); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, leaseRefresher.isLeaseTableEmpty()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 8f9003c1..7fc4a110 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -69,6 +69,8 @@ public class ShutdownTask implements ConsumerTask { @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; + private final boolean garbageCollectLeases = false; + private final boolean isLeaseTableEmpty= false; private final boolean ignoreUnexpectedChildShards; @NonNull private final LeaseCoordinator leaseCoordinator; @@ -155,8 +157,8 @@ public class ShutdownTask implements ConsumerTask { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), - initialPositionInStream, false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - scope, false, latestShards); + initialPositionInStream, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, + scope, latestShards, isLeaseTableEmpty); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); }