Remove ShutdownNotificationAware and update javadocs (#1358)

* Deprecate ShutdownNotificationAware and update javadocs

ShutdownNotificationAware is not used by KCL, this PR
marks it as deprecated and updates the javadoc

Co-authored-by: nakulj <nklj@amazon.com>
This commit is contained in:
Aravinda Kidambi Srinivasan 2024-07-02 14:28:19 -06:00 committed by GitHub
parent 715690d2c0
commit 6cba7f431d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 12 additions and 15 deletions

View file

@ -93,7 +93,6 @@ import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
@ -768,8 +767,8 @@ public class Scheduler implements Runnable {
} }
/** /**
* Requests a graceful shutdown of the worker, notifying record processors, that implement * Requests a graceful shutdown of the worker, notifying record processors
* {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to * of the impending shutdown. This gives the record processor a final chance to
* checkpoint. * checkpoint.
* *
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the

View file

@ -19,7 +19,7 @@ import java.util.concurrent.CountDownLatch;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.processor.ShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor;
/** /**
* Contains callbacks for completion of stages in a requested record processor shutdown. * Contains callbacks for completion of stages in a requested record processor shutdown.
@ -45,7 +45,7 @@ public class ShardConsumerShutdownNotification implements ShutdownNotification {
* the lease that this shutdown request will free once initial shutdown is complete * the lease that this shutdown request will free once initial shutdown is complete
* @param notificationCompleteLatch * @param notificationCompleteLatch
* used to inform the caller once the * used to inform the caller once the
* {@link ShutdownNotificationAware} object has been * {@link ShardRecordProcessor} object has been
* notified of the shutdown request. * notified of the shutdown request.
* @param shutdownCompleteLatch * @param shutdownCompleteLatch
* used to inform the caller once the record processor is fully shutdown * used to inform the caller once the record processor is fully shutdown

View file

@ -16,9 +16,11 @@ package software.amazon.kinesis.processor;
/** /**
* Allows a record processor to indicate it's aware of requested shutdowns, and handle the request. * Allows a record processor to indicate it's aware of requested shutdowns, and handle the request.
* @deprecated This class is not used, {@link ShardRecordProcessor} provide shutdownRequested
* notifications already.
*/ */
@Deprecated
public interface ShutdownNotificationAware { public interface ShutdownNotificationAware {
/** /**
* Called when the worker has been requested to shutdown, and gives the record processor a chance to checkpoint. * Called when the worker has been requested to shutdown, and gives the record processor a chance to checkpoint.
* *

View file

@ -36,14 +36,13 @@ import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer; import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.KinesisClientRecord;
/** /**
* Streamlet that tracks records it's seen - useful for testing. * Streamlet that tracks records it's seen - useful for testing.
*/ */
@Slf4j @Slf4j
public class TestStreamlet implements ShardRecordProcessor, ShutdownNotificationAware { public class TestStreamlet implements ShardRecordProcessor {
private List<KinesisClientRecord> records = new ArrayList<>(); private List<KinesisClientRecord> records = new ArrayList<>();
private Set<String> processedSeqNums = new HashSet<String>(); // used for deduping private Set<String> processedSeqNums = new HashSet<String>(); // used for deduping
@ -139,7 +138,10 @@ public class TestStreamlet implements ShardRecordProcessor, ShutdownNotification
} }
@Override @Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {} public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
shutdownNotificationCalled = true;
notifyShutdownLatch.countDown();
}
/** /**
* @return the shardId * @return the shardId
@ -166,12 +168,6 @@ public class TestStreamlet implements ShardRecordProcessor, ShutdownNotification
return shutdownNotificationCalled; return shutdownNotificationCalled;
} }
@Override
public void shutdownRequested(RecordProcessorCheckpointer checkpointer) {
shutdownNotificationCalled = true;
notifyShutdownLatch.countDown();
}
public CountDownLatch getInitializeLatch() { public CountDownLatch getInitializeLatch() {
return initializeLatch; return initializeLatch;
} }