From 6a0c17745a7932651970952f9f4e8f52275f7310 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Mon, 30 Mar 2020 17:50:33 -0400 Subject: [PATCH] PR Feedback --- .../leases/HierarchicalShardSyncer.java | 24 ++++----- .../amazon/kinesis/leases/ShardSyncTask.java | 3 +- .../leases/exceptions/ShardSyncer.java | 2 +- .../kinesis/lifecycle/ShutdownTask.java | 3 +- .../leases/HierarchicalShardSyncerTest.java | 54 ++++++++++--------- .../kinesis/lifecycle/ShutdownTaskTest.java | 3 +- 6 files changed, 47 insertions(+), 42 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index d384bfe5..ae017e19 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -102,23 +102,23 @@ public class HierarchicalShardSyncer { public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope) + final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - final List latestShards = leaseRefresher.isLeaseTableEmpty() ? + final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, garbageCollectLeases, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, isLeaseTableEmpty, latestShards); } //Provide a pre-collcted list of shards to avoid calling ListShards API public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards) + final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty, + List latestShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 - final boolean isLeaseTableEmpty = leaseRefresher.isLeaseTableEmpty(); if (!CollectionUtils.isNullOrEmpty(latestShards)) { log.debug("Num shards: {}", latestShards.size()); @@ -341,17 +341,15 @@ public class HierarchicalShardSyncer { final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); final Optional> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter)); - return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream is not in ACTIVE OR UPDATING state - " + - "will retry getting the shard list.")); + return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); } private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { - final List shards = shardDetector.listShards(); - if (shards == null) { - throw new KinesisClientLibIOException( - "Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list."); - } - return shards; + final Optional> shards = Optional.of(shardDetector.listShards()); + + return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index 9291abf4..995d37e5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -68,7 +68,8 @@ public class ShardSyncTask implements ConsumerTask { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, - initialPosition, cleanupLeasesUponShardCompletion, garbageCollectLeases, ignoreUnexpectedChildShards, scope); + initialPosition, cleanupLeasesUponShardCompletion, garbageCollectLeases, ignoreUnexpectedChildShards, + scope, leaseRefresher.isLeaseTableEmpty()); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java index a14ec784..683b29b3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -41,6 +41,6 @@ public class ShardSyncer { final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - true, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); + true, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, leaseRefresher.isLeaseTableEmpty()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 55687a1e..8f9003c1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -155,7 +155,8 @@ public class ShutdownTask implements ConsumerTask { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), - initialPositionInStream, false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); + initialPositionInStream, false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, + scope, false, latestShards); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index a6de878f..3a97f1ce 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -284,7 +284,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); @@ -318,7 +318,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -360,7 +360,8 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); + garbageCollectLeases, cleanupLeasesOfCompletedShards, false, + SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty(), latestShards); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); @@ -396,7 +397,8 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); + garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, + dynamoDBLeaseRefresher.isLeaseTableEmpty(), latestShards); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -432,7 +434,8 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList()); + garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, + dynamoDBLeaseRefresher.isLeaseTableEmpty(), new ArrayList()); final Set expectedShardIds = new HashSet<>(); @@ -472,7 +475,7 @@ public class HierarchicalShardSyncerTest { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, never()).listLeases(); @@ -492,7 +495,8 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, never()).listLeases(); @@ -527,7 +531,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -572,7 +576,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -628,7 +632,7 @@ public class HierarchicalShardSyncerTest { // Initial call: No leases present, create leases. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -643,7 +647,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) @@ -703,7 +707,7 @@ public class HierarchicalShardSyncerTest { // Initial call: Call to create leases. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -720,7 +724,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { List deleteLeases = leaseDeleteCaptor.getAllValues(); Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -744,7 +748,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); deleteLeases = leaseDeleteCaptor.getAllValues(); @@ -805,7 +809,7 @@ public class HierarchicalShardSyncerTest { // Initial call: Call to create leases. Fails on ListLeases hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -815,7 +819,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases not present, leases will be created. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); @@ -830,7 +834,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -896,7 +900,7 @@ public class HierarchicalShardSyncerTest { // Initial call: No leases present, create leases. Create lease Fails hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -905,7 +909,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); @@ -920,7 +924,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -1010,7 +1014,7 @@ public class HierarchicalShardSyncerTest { doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); @@ -1039,7 +1043,7 @@ public class HierarchicalShardSyncerTest { doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); setupMultiStream(); hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); @@ -1071,7 +1075,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, - garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -1652,7 +1656,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter); verify(shardDetector, never()).listShards(); @@ -1674,7 +1678,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); verify(shardDetector, atLeast(1)).listShards(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 927fbf4f..3a34985d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; @@ -138,7 +139,7 @@ public class ShutdownTaskTest { }).when(hierarchicalShardSyncer) .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - NULL_METRICS_FACTORY.createMetrics(), latestShards); + NULL_METRICS_FACTORY.createMetrics(), false, latestShards); final TaskResult result = task.call(); assertNotNull(result.getException());