diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java new file mode 100644 index 00000000..854d9e06 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java @@ -0,0 +1,18 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerStateChangeListener; + +public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener { + + /** + * Empty constructor for NoOp Worker State Change Listener + */ + public NoOpWorkerStateChangeListener() { + + } + + @Override + public void onWorkerStateChange(WorkerState newState) { + + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index daf89c28..f87d2d73 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -93,7 +93,6 @@ public class Worker implements Runnable { private final Optional retryGetRecordsInSeconds; private final Optional maxGetRecordsThreadPool; - // private final KinesisClientLeaseManager leaseManager; private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager controlServer; @@ -119,6 +118,8 @@ public class Worker implements Runnable { @VisibleForTesting protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); + private WorkerStateChangeListener workerStateChangeListener; + /** * Constructor. * @@ -276,7 +277,8 @@ public class Worker implements Runnable { config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy(), config.getRetryGetRecordsInSeconds(), - config.getMaxGetRecordsThreadPool()); + config.getMaxGetRecordsThreadPool(), + new NoOpWorkerStateChangeListener()); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -348,7 +350,7 @@ public class Worker implements Runnable { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - shardPrioritization, Optional.empty(), Optional.empty()); + shardPrioritization, Optional.empty(), Optional.empty(), new NoOpWorkerStateChangeListener()); } /** @@ -395,7 +397,7 @@ public class Worker implements Runnable { KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -417,6 +419,8 @@ public class Worker implements Runnable { this.shardPrioritization = shardPrioritization; this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; + this.workerStateChangeListener = workerStateChangeListener; + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); } /** @@ -440,6 +444,7 @@ public class Worker implements Runnable { if (shutdown) { return; } + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); try { initialize(); @@ -449,6 +454,7 @@ public class Worker implements Runnable { shutdown(); } + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); while (!shouldShutdown()) { runProcessLoop(); } @@ -593,10 +599,10 @@ public class Worker implements Runnable { /** * Starts the requestedShutdown process, and returns a future that can be used to track the process. - * + * * This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and * indicates the process behavior - * + * * @return a future that will be set once shutdown is completed. */ @Deprecated @@ -640,7 +646,7 @@ public class Worker implements Runnable { * Requests a graceful shutdown of the worker, notifying record processors, that implement * {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to * checkpoint. - * + * * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the * previous future. * @@ -755,6 +761,10 @@ public class Worker implements Runnable { return shardInfoShardConsumerMap; } + WorkerStateChangeListener getWorkerStateChangeListener() { + return workerStateChangeListener; + } + /** * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor * services were passed to the worker by the user, worker will not attempt to shutdown those resources. @@ -775,6 +785,7 @@ public class Worker implements Runnable { LOG.warn("Shutdown requested a second time."); return; } + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUTTING_DOWN); LOG.info("Worker shutdown requested."); // Set shutdown flag, so Worker.run can start shutdown process. @@ -801,13 +812,14 @@ public class Worker implements Runnable { if (metricsFactory instanceof WorkerCWMetricsFactory) { ((CWMetricsFactory) metricsFactory).shutdown(); } + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); shutdownComplete = true; } /** * Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()} * method before every loop run, so method must do minimum amount of work to not impact shard processing timings. - * + * * @return Whether worker should shutdown immediately. */ @VisibleForTesting @@ -874,6 +886,22 @@ public class Worker implements Runnable { } + /** + * A listener for callbacks on changes worker state + */ + @FunctionalInterface + public interface WorkerStateChangeListener { + enum WorkerState { + CREATED, + INITIALIZING, + STARTED, + SHUTTING_DOWN, + SHUT_DOWN + } + + void onWorkerStateChange(WorkerState newState); + } + /** * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on @@ -1012,7 +1040,7 @@ public class Worker implements Runnable { /** * Given configuration, returns appropriate metrics factory. - * + * * @param cloudWatchClient * Amazon CloudWatch client * @param config @@ -1039,7 +1067,7 @@ public class Worker implements Runnable { /** * Returns default executor service that should be used by the worker. - * + * * @return Default executor service that should be used by the worker. */ private static ExecutorService getExecutorService() { @@ -1089,6 +1117,7 @@ public class Worker implements Runnable { private ExecutorService execService; private ShardPrioritization shardPrioritization; private IKinesisProxy kinesisProxy; + private WorkerStateChangeListener workerStateChangeListener; /** * Default constructor. @@ -1209,10 +1238,10 @@ public class Worker implements Runnable { /** * Provides logic how to prioritize shard processing. - * + * * @param shardPrioritization * shardPrioritization is responsible to order shards before processing - * + * * @return A reference to this updated object so that method calls can be chained together. */ public Builder shardPrioritization(ShardPrioritization shardPrioritization) { @@ -1233,6 +1262,17 @@ public class Worker implements Runnable { return this; } + /** + * Set WorkerStateChangeListener for the worker + * @param workerStateChangeListener + * Sets the WorkerStateChangeListener + * @return A reference to this updated object so that method calls can be chained together. + */ + public Builder workerStateChangeListener(WorkerStateChangeListener workerStateChangeListener) { + this.workerStateChangeListener = workerStateChangeListener; + return this; + } + /** * Build the Worker instance. * @@ -1305,6 +1345,10 @@ public class Worker implements Runnable { kinesisProxy = new KinesisProxy(config, kinesisClient); } + if (workerStateChangeListener == null){ + workerStateChangeListener = new NoOpWorkerStateChangeListener(); + } + return new Worker(config.getApplicationName(), recordProcessorFactory, config, @@ -1336,9 +1380,8 @@ public class Worker implements Runnable { config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization, config.getRetryGetRecordsInSeconds(), - config.getMaxGetRecordsThreadPool()); - + config.getMaxGetRecordsThreadPool(), + workerStateChangeListener); } - } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 037a54b2..f71ed0b4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -142,7 +142,7 @@ public class WorkerTest { private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; - + private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; @@ -170,7 +170,7 @@ public class WorkerTest { private Future taskFuture; @Mock private TaskResult taskResult; - + @Before public void setup() { config = spy(new KinesisClientLibConfiguration("app", null, null, null)); @@ -179,7 +179,7 @@ public class WorkerTest { } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES - private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = + private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() { @Override @@ -212,8 +212,8 @@ public class WorkerTest { }; } }; - - private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = + + private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY); @@ -619,7 +619,7 @@ public class WorkerTest { return null; } }).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class)); - + RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); @@ -659,7 +659,7 @@ public class WorkerTest { * This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of * {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads. * This behavior makes the test a bit racy, since we need to ensure a specific order of events. - * + * * @throws Exception */ @Test @@ -1356,7 +1356,7 @@ public class WorkerTest { executorService, metricsFactory, taskBackoffTimeMillis, - failoverTimeMillis, + failoverTimeMillis, false, shardPrioritization); @@ -1432,7 +1432,7 @@ public class WorkerTest { config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, - parentShardPollIntervalMillis, + parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, @@ -1500,6 +1500,16 @@ public class WorkerTest { Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy); } + @Test + public void testBuilderForWorkerStateListener() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .build(); + Assert.assertTrue(worker.getWorkerStateChangeListener() instanceof NoOpWorkerStateChangeListener); + } + @Test public void testBuilderWithDefaultLeaseManager() { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); @@ -1801,7 +1811,7 @@ public class WorkerTest { TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier); ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); - + WorkerThread workerThread = runWorker( shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis, numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig); @@ -1857,7 +1867,7 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - + Worker worker = new Worker(stageName, recordProcessorFactory, @@ -1874,7 +1884,7 @@ public class WorkerTest { failoverTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); - + WorkerThread workerThread = new WorkerThread(worker); workerThread.start(); return workerThread;