PR Feedback

This commit is contained in:
Joshua Kim 2020-03-30 17:50:33 -04:00
parent 05f2002596
commit 6a0c17745a
6 changed files with 47 additions and 42 deletions

View file

@ -102,23 +102,23 @@ public class HierarchicalShardSyncer {
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope)
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> latestShards = leaseRefresher.isLeaseTableEmpty() ?
final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, garbageCollectLeases,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, isLeaseTableEmpty, latestShards);
}
//Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards)
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty,
List<Shard> latestShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191
final boolean isLeaseTableEmpty = leaseRefresher.isLeaseTableEmpty();
if (!CollectionUtils.isNullOrEmpty(latestShards)) {
log.debug("Num shards: {}", latestShards.size());
@ -341,17 +341,15 @@ public class HierarchicalShardSyncer {
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter));
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream is not in ACTIVE OR UPDATING state - " +
"will retry getting the shard list."));
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
}
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
final List<Shard> shards = shardDetector.listShards();
if (shards == null) {
throw new KinesisClientLibIOException(
"Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
}
return shards;
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShards());
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
}
/**

View file

@ -68,7 +68,8 @@ public class ShardSyncTask implements ConsumerTask {
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
initialPosition, cleanupLeasesUponShardCompletion, garbageCollectLeases, ignoreUnexpectedChildShards, scope);
initialPosition, cleanupLeasesUponShardCompletion, garbageCollectLeases, ignoreUnexpectedChildShards,
scope, leaseRefresher.isLeaseTableEmpty());
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);

View file

@ -41,6 +41,6 @@ public class ShardSyncer {
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
true, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
true, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, leaseRefresher.isLeaseTableEmpty());
}
}

View file

@ -155,7 +155,8 @@ public class ShutdownTask implements ConsumerTask {
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
initialPositionInStream, false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
scope, false, latestShards);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
}

View file

@ -284,7 +284,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -318,7 +318,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
@ -360,7 +360,8 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
garbageCollectLeases, cleanupLeasesOfCompletedShards, false,
SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty(), latestShards);
final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -396,7 +397,8 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE,
dynamoDBLeaseRefresher.isLeaseTableEmpty(), latestShards);
final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
@ -432,7 +434,8 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList<Shard>());
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE,
dynamoDBLeaseRefresher.isLeaseTableEmpty(), new ArrayList<Shard>());
final Set<String> expectedShardIds = new HashSet<>();
@ -472,7 +475,7 @@ public class HierarchicalShardSyncerTest {
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases();
@ -492,7 +495,8 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases();
@ -527,7 +531,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -572,7 +576,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -628,7 +632,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -643,7 +647,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
@ -703,7 +707,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -720,7 +724,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -744,7 +748,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
deleteLeases = leaseDeleteCaptor.getAllValues();
@ -805,7 +809,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases. Fails on ListLeases
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -815,7 +819,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases not present, leases will be created.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -830,7 +834,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -896,7 +900,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases. Create lease Fails
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -905,7 +909,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -920,7 +924,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -1010,7 +1014,7 @@ public class HierarchicalShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
@ -1039,7 +1043,7 @@ public class HierarchicalShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
setupMultiStream();
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
@ -1071,7 +1075,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -1652,7 +1656,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter);
verify(shardDetector, never()).listShards();
@ -1674,7 +1678,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShards();
}

View file

@ -19,6 +19,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
@ -138,7 +139,7 @@ public class ShutdownTaskTest {
}).when(hierarchicalShardSyncer)
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
NULL_METRICS_FACTORY.createMetrics(), latestShards);
NULL_METRICS_FACTORY.createMetrics(), false, latestShards);
final TaskResult result = task.call();
assertNotNull(result.getException());