Address more comments
This commit is contained in:
parent
407b4be8dd
commit
d5e6c74d77
3 changed files with 15 additions and 19 deletions
|
|
@ -88,10 +88,9 @@ public class HierarchicalShardSyncer {
|
|||
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
|
||||
final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException,
|
||||
ProvisionedThroughputException, KinesisClientLibIOException {
|
||||
if (CollectionUtils.isNullOrEmpty(shards)) {
|
||||
shards = getShardList(shardDetector);
|
||||
}
|
||||
if (!CollectionUtils.isNullOrEmpty(shards)) {
|
||||
log.debug("Num shards: {}", shards.size());
|
||||
}
|
||||
|
||||
final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
|
||||
final Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ public class ShutdownTask implements ConsumerTask {
|
|||
@NonNull
|
||||
private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
|
||||
@NonNull
|
||||
private ShutdownReason reason;
|
||||
private final ShutdownReason reason;
|
||||
@NonNull
|
||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||
private final boolean cleanupLeasesOfCompletedShards;
|
||||
|
|
@ -94,34 +94,35 @@ public class ShutdownTask implements ConsumerTask {
|
|||
|
||||
try {
|
||||
try {
|
||||
ShutdownReason localReason = reason;
|
||||
List<Shard> allShards = new ArrayList<>();
|
||||
/*
|
||||
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
|
||||
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows other
|
||||
* shard consumer to subscribe to this shard.
|
||||
*/
|
||||
if (reason == ShutdownReason.SHARD_END) {
|
||||
if (localReason == ShutdownReason.SHARD_END) {
|
||||
allShards = shardDetector.listShards();
|
||||
|
||||
if (!CollectionUtils.isNullOrEmpty(allShards) && !shardEndValidated(allShards)) {
|
||||
reason = ShutdownReason.LEASE_LOST;
|
||||
localReason = ShutdownReason.LEASE_LOST;
|
||||
}
|
||||
}
|
||||
|
||||
// If we reached end of the shard, set sequence number to SHARD_END.
|
||||
if (reason == ShutdownReason.SHARD_END) {
|
||||
if (localReason == ShutdownReason.SHARD_END) {
|
||||
recordProcessorCheckpointer
|
||||
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
|
||||
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
|
||||
}
|
||||
|
||||
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
|
||||
shardInfo.shardId(), shardInfo.concurrencyToken(), reason);
|
||||
final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(reason)
|
||||
shardInfo.shardId(), shardInfo.concurrencyToken(), localReason);
|
||||
final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason)
|
||||
.checkpointer(recordProcessorCheckpointer).build();
|
||||
final long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
if (reason == ShutdownReason.SHARD_END) {
|
||||
if (localReason == ShutdownReason.SHARD_END) {
|
||||
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
|
||||
if (lastCheckpointValue == null
|
||||
|
|
@ -143,7 +144,7 @@ public class ShutdownTask implements ConsumerTask {
|
|||
MetricsUtil.addLatency(scope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
|
||||
}
|
||||
|
||||
if (reason == ShutdownReason.SHARD_END) {
|
||||
if (localReason == ShutdownReason.SHARD_END) {
|
||||
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
|
||||
// create leases for the child shards
|
||||
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseRefresher,
|
||||
|
|
|
|||
|
|
@ -250,8 +250,7 @@ public class HierarchicalShardSyncerTest {
|
|||
.checkAndCreateLeaseForNewShards(new ArrayList<Shard>(), shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
||||
cleanupLeasesOfCompletedShards, false, SCOPE);
|
||||
|
||||
final Set<String> expectedShardIds = new HashSet<>(
|
||||
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
|
||||
final Set<String> expectedShardIds = new HashSet<>();
|
||||
|
||||
final List<Lease> requestLeases = leaseCaptor.getAllValues();
|
||||
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
|
||||
|
|
@ -259,13 +258,10 @@ public class HierarchicalShardSyncerTest {
|
|||
.collect(Collectors.toSet());
|
||||
|
||||
assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
|
||||
assertThat(requestLeaseKeys, equalTo(expectedShardIds));
|
||||
assertThat(extendedSequenceNumbers.size(), equalTo(1));
|
||||
assertThat(extendedSequenceNumbers.size(), equalTo(0));
|
||||
|
||||
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
|
||||
|
||||
verify(shardDetector).listShards();
|
||||
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
|
||||
verify(shardDetector, never()).listShards();
|
||||
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
|
||||
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue