diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index df7fdda4..dc59193f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -476,6 +476,7 @@ public class Scheduler implements Runnable { log.warn("Shutdown requested a second time."); return; } + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN_STARTED); log.info("Worker shutdown requested."); // Set shutdown flag, so Worker.run can start shutdown process. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java index 2ca08aa4..b51c5342 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java @@ -23,6 +23,7 @@ public interface WorkerStateChangeListener { CREATED, INITIALIZING, STARTED, + SHUT_DOWN_STARTED, SHUT_DOWN } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 9a10f6b3..0e836bb1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -116,13 +116,15 @@ public class SchedulerTest { private ShardDetector shardDetector; @Mock private Checkpointer checkpoint; + @Mock + private WorkerStateChangeListener workerStateChangeListener; @Before public void setup() { shardRecordProcessorFactory = new TestShardRecordProcessorFactory(); checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory()); - coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L); + coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener); leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory()); lifecycleConfig = new LifecycleConfig(); @@ -259,6 +261,14 @@ public class SchedulerTest { verify(shardDetector, times(maxInitializationAttempts)).listShards(); } + @Test + public final void testSchedulerShutdown() { + scheduler.shutdown(); + verify(workerStateChangeListener, times(1)).onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN_STARTED); + verify(leaseCoordinator, times(1)).stop(); + verify(workerStateChangeListener, times(1)).onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); + } + /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { final int numberOfRecordsPerShard = 10;