diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index d0d9c0c4..f72b31f8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -25,11 +25,11 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.utils.CollectionUtils; -import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCleanupManager; @@ -53,7 +53,6 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.Set; -import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; @@ -100,6 +99,7 @@ public class ShutdownTask implements ConsumerTask { private final LeaseCleanupManager leaseCleanupManager; private static final Function leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo); + private int retryLeftForValidParentState = 10; /* * Invokes ShardRecordProcessor shutdown() API. @@ -121,26 +121,21 @@ public class ShutdownTask implements ConsumerTask { final long startTime = System.currentTimeMillis(); final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); - final ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason, currentShardLease); + final Runnable leaseLostAction = () -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()); - if (localReason == ShutdownReason.SHARD_END) { - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); - if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - boolean isSuccess = false; - try { - isSuccess = attemptShardEndCheckpointing(scope, startTime); - } finally { - // Check if either the shard end ddb persist is successful or - // if childshards is empty. When child shards is empty then either it is due to - // completed shard being reprocessed or we got RNF from service. - // For these cases enqueue the lease for deletion. - if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) { - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); - } - } + if (reason == ShutdownReason.SHARD_END) { + try { + takeShardEndAction(currentShardLease, scope, startTime); + } catch (InvalidStateException e) { + // If InvalidStateException happens, it indicates we have a non recoverable error in short term. + // In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry shutting down. + log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. " + + "Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. ", currentShardLease.leaseKey(), e); + dropLease(currentShardLease); + throwOnApplicationException(leaseLostAction, scope, startTime); } } else { - throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); + throwOnApplicationException(leaseLostAction, scope, startTime); } log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(shardInfo)); @@ -169,41 +164,47 @@ public class ShutdownTask implements ConsumerTask { return new TaskResult(exception); } - private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason, Lease currentShardLease) - throws DependencyException, ProvisionedThroughputException { - ShutdownReason localReason = originalReason; - if (originalReason == ShutdownReason.SHARD_END) { - // Create new lease for the child shards if they don't exist. - // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. - // This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception. - // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. - // This scenario could happen when customer deletes the stream while leaving the KCL application running. - Validate.validState(currentShardLease != null, - "%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.", - leaseKeyProvider.apply(shardInfo)); + // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. + private void takeShardEndAction(Lease currentShardLease, + MetricsScope scope, long startTime) + throws DependencyException, ProvisionedThroughputException, InvalidStateException, + CustomerApplicationException { + // Create new lease for the child shards if they don't exist. + // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. + // This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception. + // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. + // This scenario could happen when customer deletes the stream while leaving the KCL application running. + if (currentShardLease == null) { + throw new InvalidStateException(leaseKeyProvider.apply(shardInfo) + + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); + } + if (!CollectionUtils.isNullOrEmpty(childShards)) { + createLeasesForChildShardsIfNotExist(); + updateLeaseWithChildShards(currentShardLease); + } + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, + shardInfo); + if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { + boolean isSuccess = false; try { - if (!CollectionUtils.isNullOrEmpty(childShards)) { - createLeasesForChildShardsIfNotExist(); - updateLeaseWithChildShards(currentShardLease); + isSuccess = attemptShardEndCheckpointing(scope, startTime); + } finally { + // Check if either the shard end ddb persist is successful or + // if childshards is empty. When child shards is empty then either it is due to + // completed shard being reprocessed or we got RNF from service. + // For these cases enqueue the lease for deletion. + if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); } - } catch (InvalidStateException e) { - // If InvalidStateException happens, it indicates we are missing childShard related information. - // In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting - // childShard information in the processTask. - localReason = ShutdownReason.LEASE_LOST; - log.warn("Lease {}: Exception happened while shutting down shardConsumer with SHARD_END reason. " + - "Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. Exception: ", currentShardLease.leaseKey(), e); - dropLease(currentShardLease); } } - return localReason; } private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) - .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); + .orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. @@ -252,8 +253,15 @@ public class ShutdownTask implements ConsumerTask { Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects .isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1))); if (!isValidLeaseTableState) { - throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard - + " has partial parent information in lease table."); + if (--retryLeftForValidParentState >= 0) { + throw new BlockedOnParentShardException( + "Shard " + shardInfo.shardId() + "'s only child shard " + childShard + + " has partial parent information in lease table. Hence deferring lease creation of child shard."); + } else { + throw new InvalidStateException( + "Shard " + shardInfo.shardId() + "'s only child shard " + childShard + + " has partial parent information in lease table. Hence deferring lease creation of child shard."); + } } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 3405362a..dc5d9763 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -44,6 +44,7 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; @@ -197,7 +198,7 @@ public class ShutdownTaskTest { } @Test - public final void testCallThrowsWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + public final void testCallThrowsUntilParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), ExtendedSequenceNumber.LATEST); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, @@ -209,37 +210,93 @@ public class ShutdownTaskTest { Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2")); Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList()); when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")).thenReturn(null, parentLease); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")) + .thenReturn(null, null, null, null, null, parentLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease); // Return null lease first time to simulate partial parent lease info - when(leaseRefresher.getLease("shardId-1")).thenReturn(null, parentLease); + when(leaseRefresher.getLease("shardId-1")) + .thenReturn(null, null, null, null, null, parentLease); - // Make first attempt with partial parent info in lease table - TaskResult result = task.call(); - assertNotNull(result.getException()); - assertTrue(result.getException() instanceof InvalidStateException); - assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); - verify(recordsPublisher, never()).shutdown(); - verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); - verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); - verify(leaseCoordinator, never()).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); - verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); - verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); - verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); + // Make first 5 attempts with partial parent info in lease table + for (int i = 0; i < 5; i++) { + TaskResult result = task.call(); + assertNotNull(result.getException()); + assertTrue(result.getException() instanceof BlockedOnParentShardException); + assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(recordsPublisher, never()).shutdown(); + verify(shardRecordProcessor, never()) + .shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + verify(leaseCoordinator, never()) + .updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); + verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); + verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); + verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); + } // make next attempt with complete parent info in lease table - result = task.call(); + TaskResult result = task.call(); assertNull(result.getException()); verify(recordsPublisher).shutdown(); verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); - verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); + verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class)); verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class)); + } + @Test + public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager); + + when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2")); + Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList()); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")) + .thenReturn(null, null, null, null, null, null, null, null, null, null, null); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); + when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease); + // Return null lease first time to simulate partial parent lease info + when(leaseRefresher.getLease("shardId-1")) + .thenReturn(null, null, null, null, null, null, null, null, null, null, null); + + // Make first 10 attempts with partial parent info in lease table + for (int i = 0; i < 10; i++) { + TaskResult result = task.call(); + assertNotNull(result.getException()); + assertTrue(result.getException() instanceof BlockedOnParentShardException); + assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(recordsPublisher, never()).shutdown(); + verify(shardRecordProcessor, never()) + .shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + verify(leaseCoordinator, never()) + .updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); + verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); + verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); + verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); + } + + // make final attempt with incomplete parent info in lease table + TaskResult result = task.call(); + assertNull(result.getException()); + verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); + verify(leaseRefresher, never()).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class)); + verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); + verify(leaseCoordinator).dropLease(Matchers.any(Lease.class)); + verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); } /**