Created listener for worker state change

https://github.com/awslabs/amazon-kinesis-client/issues/275

Update WorkerTest.java
This commit is contained in:
nyo 2018-02-02 21:16:52 +01:00
parent 6fc148740d
commit 5a8f2ab029
3 changed files with 98 additions and 27 deletions

View file

@ -0,0 +1,18 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerStateChangeListener;
public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener {
/**
* Empty constructor for NoOp Worker State Change Listener
*/
public NoOpWorkerStateChangeListener() {
}
@Override
public void onWorkerStateChange(WorkerState newState) {
}
}

View file

@ -93,7 +93,6 @@ public class Worker implements Runnable {
private final Optional<Integer> retryGetRecordsInSeconds; private final Optional<Integer> retryGetRecordsInSeconds;
private final Optional<Integer> maxGetRecordsThreadPool; private final Optional<Integer> maxGetRecordsThreadPool;
// private final KinesisClientLeaseManager leaseManager;
private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager controlServer; private final ShardSyncTaskManager controlServer;
@ -119,6 +118,8 @@ public class Worker implements Runnable {
@VisibleForTesting @VisibleForTesting
protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
private WorkerStateChangeListener workerStateChangeListener;
/** /**
* Constructor. * Constructor.
* *
@ -276,7 +277,8 @@ public class Worker implements Runnable {
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
config.getShardPrioritizationStrategy(), config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(), config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool()); config.getMaxGetRecordsThreadPool(),
new NoOpWorkerStateChangeListener());
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) { if (config.getRegionName() != null) {
@ -348,7 +350,7 @@ public class Worker implements Runnable {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty()); shardPrioritization, Optional.empty(), Optional.empty(), new NoOpWorkerStateChangeListener());
} }
/** /**
@ -395,7 +397,7 @@ public class Worker implements Runnable {
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) { Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
this.config = config; this.config = config;
@ -417,6 +419,8 @@ public class Worker implements Runnable {
this.shardPrioritization = shardPrioritization; this.shardPrioritization = shardPrioritization;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
} }
/** /**
@ -440,6 +444,7 @@ public class Worker implements Runnable {
if (shutdown) { if (shutdown) {
return; return;
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
try { try {
initialize(); initialize();
@ -449,6 +454,7 @@ public class Worker implements Runnable {
shutdown(); shutdown();
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
while (!shouldShutdown()) { while (!shouldShutdown()) {
runProcessLoop(); runProcessLoop();
} }
@ -755,6 +761,10 @@ public class Worker implements Runnable {
return shardInfoShardConsumerMap; return shardInfoShardConsumerMap;
} }
WorkerStateChangeListener getWorkerStateChangeListener() {
return workerStateChangeListener;
}
/** /**
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
* services were passed to the worker by the user, worker will not attempt to shutdown those resources. * services were passed to the worker by the user, worker will not attempt to shutdown those resources.
@ -775,6 +785,7 @@ public class Worker implements Runnable {
LOG.warn("Shutdown requested a second time."); LOG.warn("Shutdown requested a second time.");
return; return;
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUTTING_DOWN);
LOG.info("Worker shutdown requested."); LOG.info("Worker shutdown requested.");
// Set shutdown flag, so Worker.run can start shutdown process. // Set shutdown flag, so Worker.run can start shutdown process.
@ -801,6 +812,7 @@ public class Worker implements Runnable {
if (metricsFactory instanceof WorkerCWMetricsFactory) { if (metricsFactory instanceof WorkerCWMetricsFactory) {
((CWMetricsFactory) metricsFactory).shutdown(); ((CWMetricsFactory) metricsFactory).shutdown();
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
shutdownComplete = true; shutdownComplete = true;
} }
@ -874,6 +886,22 @@ public class Worker implements Runnable {
} }
/**
* A listener for callbacks on changes worker state
*/
@FunctionalInterface
public interface WorkerStateChangeListener {
enum WorkerState {
CREATED,
INITIALIZING,
STARTED,
SHUTTING_DOWN,
SHUT_DOWN
}
void onWorkerStateChange(WorkerState newState);
}
/** /**
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
@ -1089,6 +1117,7 @@ public class Worker implements Runnable {
private ExecutorService execService; private ExecutorService execService;
private ShardPrioritization shardPrioritization; private ShardPrioritization shardPrioritization;
private IKinesisProxy kinesisProxy; private IKinesisProxy kinesisProxy;
private WorkerStateChangeListener workerStateChangeListener;
/** /**
* Default constructor. * Default constructor.
@ -1233,6 +1262,17 @@ public class Worker implements Runnable {
return this; return this;
} }
/**
* Set WorkerStateChangeListener for the worker
* @param workerStateChangeListener
* Sets the WorkerStateChangeListener
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder workerStateChangeListener(WorkerStateChangeListener workerStateChangeListener) {
this.workerStateChangeListener = workerStateChangeListener;
return this;
}
/** /**
* Build the Worker instance. * Build the Worker instance.
* *
@ -1305,6 +1345,10 @@ public class Worker implements Runnable {
kinesisProxy = new KinesisProxy(config, kinesisClient); kinesisProxy = new KinesisProxy(config, kinesisClient);
} }
if (workerStateChangeListener == null){
workerStateChangeListener = new NoOpWorkerStateChangeListener();
}
return new Worker(config.getApplicationName(), return new Worker(config.getApplicationName(),
recordProcessorFactory, recordProcessorFactory,
config, config,
@ -1336,9 +1380,8 @@ public class Worker implements Runnable {
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
shardPrioritization, shardPrioritization,
config.getRetryGetRecordsInSeconds(), config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool()); config.getMaxGetRecordsThreadPool(),
workerStateChangeListener);
} }
} }
} }

View file

@ -1500,6 +1500,16 @@ public class WorkerTest {
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy); Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
} }
@Test
public void testBuilderForWorkerStateListener() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.build();
Assert.assertTrue(worker.getWorkerStateChangeListener() instanceof NoOpWorkerStateChangeListener);
}
@Test @Test
public void testBuilderWithDefaultLeaseManager() { public void testBuilderWithDefaultLeaseManager() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);