From 969341130ac9581dec83869dff702d66a0dcf462 Mon Sep 17 00:00:00 2001 From: vincentvilo-aws <142546855+vincentvilo-aws@users.noreply.github.com> Date: Wed, 10 Apr 2024 14:10:52 -0700 Subject: [PATCH] move shutdownComplete call to ShardConsumer (#1308) * move shutdownComplete call to ShardConsumer --- .../kinesis/lifecycle/ConsumerStates.java | 1 - .../kinesis/lifecycle/ShardConsumer.java | 6 +++++ .../kinesis/lifecycle/ShutdownTask.java | 6 ----- .../kinesis/lifecycle/ShardConsumerTest.java | 3 +++ .../kinesis/lifecycle/ShutdownTaskTest.java | 22 +------------------ 5 files changed, 10 insertions(+), 28 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index 058b3009..07316390 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -478,7 +478,6 @@ class ConsumerStates { argument.shardRecordProcessor(), argument.recordProcessorCheckpointer(), consumer.shutdownReason(), - consumer.shutdownNotification(), argument.initialPositionInStream(), argument.cleanupLeasesOfCompletedShards(), argument.ignoreUnexpectedChildShards(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 4162ea81..96261131 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -313,6 +313,12 @@ public class ShardConsumer { } executeTask(shardEndProcessRecordsInput); + + // call shutdownNotification.shutdownComplete() if shutting down as part of gracefulShutdown + if (currentState.state() == ConsumerStates.ShardConsumerState.SHUTTING_DOWN && + taskOutcome == TaskOutcome.SUCCESSFUL && shutdownNotification != null) { + shutdownNotification.shutdownComplete(); + } return false; } }, executorService); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index d8c9d379..d1e978a7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -87,7 +87,6 @@ public class ShutdownTask implements ConsumerTask { private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer; @NonNull private final ShutdownReason reason; - private final ShutdownNotification shutdownNotification; @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; @@ -151,11 +150,6 @@ public class ShutdownTask implements ConsumerTask { log.debug("Shutting down retrieval strategy for shard {}.", leaseKey); recordsPublisher.shutdown(); - // shutdownNotification is only set and used when gracefulShutdown starts - if (shutdownNotification != null) { - shutdownNotification.shutdownComplete(); - } - log.debug("Record processor completed shutdown() for shard {}", leaseKey); return new TaskResult(null); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 62fd13ef..42f88b12 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -697,6 +697,9 @@ public class ShardConsumerTest { shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); // No task is created/run for this shutdownRequestedAwaitState, so there's no task outcome. + // shutdownNotification.shutdownComplete() should only be called for gracefulShutdown + verify(shutdownNotification, times(1)).shutdownComplete(); + verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput); verify(taskExecutionListener, times(2)).afterTaskExecution(processTaskInput); verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedTaskInput); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index b79ffc03..390d3ef6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -310,18 +310,6 @@ public class ShutdownTaskTest { verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); } - /** - * shutdownNotification is only set when ShardConsumer.gracefulShutdown() is called and should be null otherwise. - * The task should still call recordsPublisher.shutdown() regardless of the notification - */ - @Test - public void testCallWhenShutdownNotificationIsSet() { - final TaskResult result = createShutdownTaskWithNotification(LEASE_LOST, Collections.emptyList()).call(); - assertNull(result.getException()); - verify(recordsPublisher).shutdown(); - verify(shutdownNotification).shutdownComplete(); - } - @Test public void testCallWhenShutdownNotificationIsNull() { final TaskResult result = createShutdownTask(LEASE_LOST, Collections.emptyList()).call(); @@ -394,15 +382,7 @@ public class ShutdownTaskTest { private ShutdownTask createShutdownTask(final ShutdownReason reason, final List childShards, final ShardInfo shardInfo) { return new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - reason, null, INITIAL_POSITION_TRIM_HORIZON, false, false, - leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, - NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager); - } - - private ShutdownTask createShutdownTaskWithNotification(final ShutdownReason reason, - final List childShards) { - return new ShutdownTask(SHARD_INFO, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - reason, shutdownNotification, INITIAL_POSITION_TRIM_HORIZON, false, false, + reason, INITIAL_POSITION_TRIM_HORIZON, false, false, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager); }