diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 1f89f8c8..578af465 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -78,21 +78,21 @@ public class HierarchicalShardSyncer { final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - final List shards = getShardList(shardDetector); + final List latestShards = getShardList(shardDetector); checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, scope, shards); + ignoreUnexpectedChildShards, scope, latestShards); } //Provide a pre-collcted list of shards to avoid calling ListShards API public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List shards)throws DependencyException, InvalidStateException, + final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards)throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - if (!CollectionUtils.isNullOrEmpty(shards)) { - log.debug("Num shards: {}", shards.size()); + if (!CollectionUtils.isNullOrEmpty(latestShards)) { + log.debug("Num shards: {}", latestShards.size()); } - final Map shardIdToShardMap = constructShardIdToShardMap(shards); + final Map shardIdToShardMap = constructShardIdToShardMap(latestShards); final Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap( shardIdToShardMap); final Set inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap); @@ -102,7 +102,7 @@ public class HierarchicalShardSyncer { final List currentLeases = leaseRefresher.listLeases(); - final List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); + final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds); log.debug("Num new leases to create: {}", newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); @@ -116,7 +116,7 @@ public class HierarchicalShardSyncer { } final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher); + cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher); if (cleanupLeasesOfCompletedShards) { cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 5785220e..2bfcd358 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -99,19 +99,20 @@ public class ShutdownTask implements ConsumerTask { try { try { ShutdownReason localReason = reason; - List allShards = new ArrayList<>(); + List latestShards = null; /* * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END - * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active - * workers to contend for the lease of this shard. + * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows + * active workers to contend for the lease of this shard. */ if (localReason == ShutdownReason.SHARD_END) { - allShards = shardDetector.listShards(); + latestShards = shardDetector.listShards(); - if (!CollectionUtils.isNullOrEmpty(allShards) && !validateShardEnd(allShards)) { + //If latestShards is empty, should also shutdown the ShardConsumer without checkpoint with SHARD_END + if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { localReason = ShutdownReason.LEASE_LOST; dropLease(); - log.info("Force the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId()); + log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId()); } } @@ -154,7 +155,7 @@ public class ShutdownTask implements ConsumerTask { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), - initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, allShards); + initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } @@ -196,30 +197,25 @@ public class ShutdownTask implements ConsumerTask { return reason; } - private boolean validateShardEnd(List shards) { + private boolean isShardInContextParentOfAny(List shards) { for(Shard shard : shards) { - if (isChildShardOfCurrentShard(shard)) { + if (isChildShardOfShardInContext(shard)) { return true; } } return false; } - private boolean isChildShardOfCurrentShard(Shard shard) { + private boolean isChildShardOfShardInContext(Shard shard) { return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId()) || StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId())); } private void dropLease() { - Collection leases = leaseCoordinator.getAssignments(); - if(leases != null && !leases.isEmpty()) { - for(Lease lease : leases) { - if(lease.leaseKey().equals(shardInfo.shardId())) { - leaseCoordinator.dropLease(lease); - log.warn("Dropped lease for shutting down ShardConsumer: " + lease.leaseKey()); - break; - } - } + Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId()); + leaseCoordinator.dropLease(currentLease); + if(currentLease != null) { + log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index ee6fa933..e9db752c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -207,16 +207,16 @@ public class HierarchicalShardSyncerTest { @Test public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { - final List shards = constructShardListForGraphA(); + final List latestShards = constructShardListForGraphA(); final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards()).thenReturn(latestShards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false, SCOPE, shards); + cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index cb3f42cb..ea51a91c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -42,6 +42,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; +import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; @@ -124,8 +125,8 @@ public class ShutdownTaskTest { */ @Test public final void testCallWhenSyncingShardsThrows() throws Exception { - List shards = constructShardListGraphA(); - when(shardDetector.listShards()).thenReturn(shards); + List latestShards = constructShardListGraphA(); + when(shardDetector.listShards()).thenReturn(latestShards); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); @@ -134,7 +135,7 @@ public class ShutdownTaskTest { }).when(hierarchicalShardSyncer) .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - NULL_METRICS_FACTORY.createMetrics(), shards); + NULL_METRICS_FACTORY.createMetrics(), latestShards); final TaskResult result = task.call(); assertNotNull(result.getException()); @@ -188,7 +189,7 @@ public class ShutdownTaskTest { verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(shardDetector, times(1)).listShards(); - verify(leaseCoordinator).getAssignments(); + verify(leaseCoordinator).getCurrentlyHeldLease(shardInfo.shardId()); } /**