diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index a163dfdc..881dc5bb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -93,7 +93,6 @@ import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; -import software.amazon.kinesis.processor.ShutdownNotificationAware; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; @@ -768,8 +767,8 @@ public class Scheduler implements Runnable { } /** - * Requests a graceful shutdown of the worker, notifying record processors, that implement - * {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to + * Requests a graceful shutdown of the worker, notifying record processors, + * of the impending shutdown. This gives the record processor a final chance to * checkpoint. * * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java index 1fe9fe2b..98dd50de 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java @@ -19,7 +19,7 @@ import java.util.concurrent.CountDownLatch; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; -import software.amazon.kinesis.processor.ShutdownNotificationAware; +import software.amazon.kinesis.processor.ShardRecordProcessor; /** * Contains callbacks for completion of stages in a requested record processor shutdown. @@ -45,7 +45,7 @@ public class ShardConsumerShutdownNotification implements ShutdownNotification { * the lease that this shutdown request will free once initial shutdown is complete * @param notificationCompleteLatch * used to inform the caller once the - * {@link ShutdownNotificationAware} object has been + * {@link ShardRecordProcessor} object has been * notified of the shutdown request. * @param shutdownCompleteLatch * used to inform the caller once the record processor is fully shutdown diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShutdownNotificationAware.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShutdownNotificationAware.java index 3a22c1c7..0c03f172 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShutdownNotificationAware.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShutdownNotificationAware.java @@ -16,9 +16,11 @@ package software.amazon.kinesis.processor; /** * Allows a record processor to indicate it's aware of requested shutdowns, and handle the request. + * @deprecated This class is not used, {@link ShardRecordProcessor} provide shutdownRequested + * notifications already. */ +@Deprecated public interface ShutdownNotificationAware { - /** * Called when the worker has been requested to shutdown, and gives the record processor a chance to checkpoint. * @@ -27,4 +29,4 @@ public interface ShutdownNotificationAware { * @param checkpointer the checkpointer that can be used to save progress. */ void shutdownRequested(RecordProcessorCheckpointer checkpointer); -} +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamlet.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamlet.java index 2c510826..9c776ce8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamlet.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamlet.java @@ -36,14 +36,13 @@ import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; import software.amazon.kinesis.processor.ShardRecordProcessor; -import software.amazon.kinesis.processor.ShutdownNotificationAware; import software.amazon.kinesis.retrieval.KinesisClientRecord; /** * Streamlet that tracks records it's seen - useful for testing. */ @Slf4j -public class TestStreamlet implements ShardRecordProcessor, ShutdownNotificationAware { +public class TestStreamlet implements ShardRecordProcessor { private List records = new ArrayList<>(); private Set processedSeqNums = new HashSet(); // used for deduping @@ -139,7 +138,10 @@ public class TestStreamlet implements ShardRecordProcessor, ShutdownNotification } @Override - public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {} + public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { + shutdownNotificationCalled = true; + notifyShutdownLatch.countDown(); + } /** * @return the shardId @@ -166,12 +168,6 @@ public class TestStreamlet implements ShardRecordProcessor, ShutdownNotification return shutdownNotificationCalled; } - @Override - public void shutdownRequested(RecordProcessorCheckpointer checkpointer) { - shutdownNotificationCalled = true; - notifyShutdownLatch.countDown(); - } - public CountDownLatch getInitializeLatch() { return initializeLatch; }