From 82cc3625453ae17b2e884835dc999c2779cd849c Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Mon, 30 Mar 2020 18:37:30 -0400 Subject: [PATCH] --amend --- .../leases/HierarchicalShardSyncer.java | 16 +++--- .../amazon/kinesis/leases/ShardSyncTask.java | 4 +- .../leases/exceptions/ShardSyncer.java | 2 +- .../kinesis/lifecycle/ShutdownTask.java | 8 +-- .../leases/HierarchicalShardSyncerTest.java | 57 +++++++++---------- .../kinesis/lifecycle/ShutdownTaskTest.java | 9 +-- 6 files changed, 46 insertions(+), 50 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 8d280d12..151ead7c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -89,10 +89,10 @@ public class HierarchicalShardSyncer { * @param shardDetector * @param leaseRefresher * @param initialPosition - * @param garbageCollectLeases + * @param scope * @param cleanupLeasesOfCompletedShards * @param ignoreUnexpectedChildShards - * @param scope + * @param garbageCollectLeases * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException @@ -101,20 +101,20 @@ public class HierarchicalShardSyncer { // CHECKSTYLE:OFF CyclomaticComplexity public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, - final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) + final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, + final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); - checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, garbageCollectLeases, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards, isLeaseTableEmpty); + checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases, + isLeaseTableEmpty); } //Provide a pre-collcted list of shards to avoid calling ListShards API public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, - final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards, final boolean isLeaseTableEmpty) + List latestShards, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, + final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index 995d37e5..f5c7ab8a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -68,8 +68,8 @@ public class ShardSyncTask implements ConsumerTask { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, - initialPosition, cleanupLeasesUponShardCompletion, garbageCollectLeases, ignoreUnexpectedChildShards, - scope, leaseRefresher.isLeaseTableEmpty()); + initialPosition, scope, garbageCollectLeases, ignoreUnexpectedChildShards, cleanupLeasesUponShardCompletion, + leaseRefresher.isLeaseTableEmpty()); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java index 14cd40c3..c0ed2d2a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -42,6 +42,6 @@ public class ShardSyncer { final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, leaseRefresher.isLeaseTableEmpty()); + scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 7fc4a110..f4538be6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -16,7 +16,6 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; -import com.sun.org.apache.bcel.internal.generic.LUSHR; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -28,7 +27,6 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; -import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.HierarchicalShardSyncer; @@ -42,8 +40,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** @@ -157,8 +153,8 @@ public class ShutdownTask implements ConsumerTask { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), - initialPositionInStream, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - scope, latestShards, isLeaseTableEmpty); + initialPositionInStream, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases, + isLeaseTableEmpty); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 3a97f1ce..d881b776 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -30,7 +30,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.math.BigInteger; -import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -284,7 +283,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); @@ -318,7 +317,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -360,8 +359,8 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, - SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty(), latestShards); + latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); @@ -397,8 +396,8 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, - dynamoDBLeaseRefresher.isLeaseTableEmpty(), latestShards); + latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -434,8 +433,8 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, - dynamoDBLeaseRefresher.isLeaseTableEmpty(), new ArrayList()); + new ArrayList(), cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>(); @@ -475,7 +474,7 @@ public class HierarchicalShardSyncerTest { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, never()).listLeases(); @@ -495,7 +494,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, + INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector).listShards(); @@ -531,7 +530,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -576,7 +575,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -632,7 +631,7 @@ public class HierarchicalShardSyncerTest { // Initial call: No leases present, create leases. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -647,7 +646,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) @@ -707,7 +706,7 @@ public class HierarchicalShardSyncerTest { // Initial call: Call to create leases. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -724,7 +723,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { List deleteLeases = leaseDeleteCaptor.getAllValues(); Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -748,7 +747,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); deleteLeases = leaseDeleteCaptor.getAllValues(); @@ -809,7 +808,7 @@ public class HierarchicalShardSyncerTest { // Initial call: Call to create leases. Fails on ListLeases hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -819,7 +818,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases not present, leases will be created. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); @@ -834,7 +833,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -900,7 +899,7 @@ public class HierarchicalShardSyncerTest { // Initial call: No leases present, create leases. Create lease Fails hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -909,7 +908,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); @@ -924,7 +923,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -1014,7 +1013,7 @@ public class HierarchicalShardSyncerTest { doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); @@ -1043,7 +1042,7 @@ public class HierarchicalShardSyncerTest { doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); setupMultiStream(); hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); @@ -1075,7 +1074,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -1656,7 +1655,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter); verify(shardDetector, never()).listShards(); @@ -1678,7 +1677,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); verify(shardDetector, atLeast(1)).listShards(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 3a34985d..220fe4a5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; @@ -44,7 +43,6 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; -import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; @@ -129,6 +127,9 @@ public class ShutdownTaskTest { */ @Test public final void testCallWhenSyncingShardsThrows() throws Exception { + final boolean garbageCollectLeases = false; + final boolean isLeaseTableEmpty = false; + List latestShards = constructShardListGraphA(); when(shardDetector.listShards()).thenReturn(latestShards); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); @@ -138,8 +139,8 @@ public class ShutdownTaskTest { throw new KinesisClientLibIOException("KinesisClientLibIOException"); }).when(hierarchicalShardSyncer) .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, - false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - NULL_METRICS_FACTORY.createMetrics(), false, latestShards); + latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, + NULL_METRICS_FACTORY.createMetrics(), garbageCollectLeases, isLeaseTableEmpty); final TaskResult result = task.call(); assertNotNull(result.getException());