From d5e6c74d77a0af50e9fbb7cd6e550a804931f2ae Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Fri, 4 Oct 2019 12:12:56 -0700 Subject: [PATCH] Address more comments --- .../kinesis/leases/HierarchicalShardSyncer.java | 5 ++--- .../amazon/kinesis/lifecycle/ShutdownTask.java | 17 +++++++++-------- .../leases/HierarchicalShardSyncerTest.java | 12 ++++-------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 78c95f4f..71fa0102 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -88,10 +88,9 @@ public class HierarchicalShardSyncer { final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - if (CollectionUtils.isNullOrEmpty(shards)) { - shards = getShardList(shardDetector); + if (!CollectionUtils.isNullOrEmpty(shards)) { + log.debug("Num shards: {}", shards.size()); } - log.debug("Num shards: {}", shards.size()); final Map shardIdToShardMap = constructShardIdToShardMap(shards); final Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap( diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index cc7fddfd..136f3a96 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -61,7 +61,7 @@ public class ShutdownTask implements ConsumerTask { @NonNull private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer; @NonNull - private ShutdownReason reason; + private final ShutdownReason reason; @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; @@ -94,34 +94,35 @@ public class ShutdownTask implements ConsumerTask { try { try { + ShutdownReason localReason = reason; List allShards = new ArrayList<>(); /* * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows other * shard consumer to subscribe to this shard. */ - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { allShards = shardDetector.listShards(); if (!CollectionUtils.isNullOrEmpty(allShards) && !shardEndValidated(allShards)) { - reason = ShutdownReason.LEASE_LOST; + localReason = ShutdownReason.LEASE_LOST; } } // If we reached end of the shard, set sequence number to SHARD_END. - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { recordProcessorCheckpointer .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); } log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - shardInfo.shardId(), shardInfo.concurrencyToken(), reason); - final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(reason) + shardInfo.shardId(), shardInfo.concurrencyToken(), localReason); + final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason) .checkpointer(recordProcessorCheckpointer).build(); final long startTime = System.currentTimeMillis(); try { - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); if (lastCheckpointValue == null @@ -143,7 +144,7 @@ public class ShutdownTask implements ConsumerTask { MetricsUtil.addLatency(scope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); } - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseRefresher, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 5d4ac902..232d17dc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -250,8 +250,7 @@ public class HierarchicalShardSyncerTest { .checkAndCreateLeaseForNewShards(new ArrayList(), shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, cleanupLeasesOfCompletedShards, false, SCOPE); - final Set expectedShardIds = new HashSet<>( - Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); + final Set expectedShardIds = new HashSet<>(); final List requestLeases = leaseCaptor.getAllValues(); final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -259,13 +258,10 @@ public class HierarchicalShardSyncerTest { .collect(Collectors.toSet()); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); - assertThat(requestLeaseKeys, equalTo(expectedShardIds)); - assertThat(extendedSequenceNumbers.size(), equalTo(1)); + assertThat(extendedSequenceNumbers.size(), equalTo(0)); - extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - - verify(shardDetector).listShards(); - verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(shardDetector, never()).listShards(); + verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); }