Add a WorkerState of SHUT_DOWN_STARTED (#457)

Added a new WorkerState that indicates when a shutdown has started
This commit is contained in:
jiaxul 2019-01-24 12:53:39 -08:00 committed by Justin Pfifer
parent 03c15eb275
commit 8d9427e06c
3 changed files with 13 additions and 1 deletions

View file

@ -476,6 +476,7 @@ public class Scheduler implements Runnable {
log.warn("Shutdown requested a second time."); log.warn("Shutdown requested a second time.");
return; return;
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN_STARTED);
log.info("Worker shutdown requested."); log.info("Worker shutdown requested.");
// Set shutdown flag, so Worker.run can start shutdown process. // Set shutdown flag, so Worker.run can start shutdown process.

View file

@ -23,6 +23,7 @@ public interface WorkerStateChangeListener {
CREATED, CREATED,
INITIALIZING, INITIALIZING,
STARTED, STARTED,
SHUT_DOWN_STARTED,
SHUT_DOWN SHUT_DOWN
} }

View file

@ -116,13 +116,15 @@ public class SchedulerTest {
private ShardDetector shardDetector; private ShardDetector shardDetector;
@Mock @Mock
private Checkpointer checkpoint; private Checkpointer checkpoint;
@Mock
private WorkerStateChangeListener workerStateChangeListener;
@Before @Before
public void setup() { public void setup() {
shardRecordProcessorFactory = new TestShardRecordProcessorFactory(); shardRecordProcessorFactory = new TestShardRecordProcessorFactory();
checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory()); checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory());
coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L); coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory()); workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory());
lifecycleConfig = new LifecycleConfig(); lifecycleConfig = new LifecycleConfig();
@ -259,6 +261,14 @@ public class SchedulerTest {
verify(shardDetector, times(maxInitializationAttempts)).listShards(); verify(shardDetector, times(maxInitializationAttempts)).listShards();
} }
@Test
public final void testSchedulerShutdown() {
scheduler.shutdown();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN_STARTED);
verify(leaseCoordinator, times(1)).stop();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
}
/*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception {
final int numberOfRecordsPerShard = 10; final int numberOfRecordsPerShard = 10;