Changes for the pull request after review comments
Changes according to review requests
This commit is contained in:
parent
5a8f2ab029
commit
f8d380845d
3 changed files with 22 additions and 22 deletions
|
|
@ -1,7 +1,5 @@
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerStateChangeListener;
|
|
||||||
|
|
||||||
public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener {
|
public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,7 @@ public class Worker implements Runnable {
|
||||||
private static final Log LOG = LogFactory.getLog(Worker.class);
|
private static final Log LOG = LogFactory.getLog(Worker.class);
|
||||||
|
|
||||||
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
||||||
|
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
||||||
|
|
||||||
private WorkerLog wlog = new WorkerLog();
|
private WorkerLog wlog = new WorkerLog();
|
||||||
|
|
||||||
|
|
@ -278,7 +279,7 @@ public class Worker implements Runnable {
|
||||||
config.getShardPrioritizationStrategy(),
|
config.getShardPrioritizationStrategy(),
|
||||||
config.getRetryGetRecordsInSeconds(),
|
config.getRetryGetRecordsInSeconds(),
|
||||||
config.getMaxGetRecordsThreadPool(),
|
config.getMaxGetRecordsThreadPool(),
|
||||||
new NoOpWorkerStateChangeListener());
|
DEFAULT_WORKER_STATE_CHANGE_LISTENER);
|
||||||
|
|
||||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||||
if (config.getRegionName() != null) {
|
if (config.getRegionName() != null) {
|
||||||
|
|
@ -350,7 +351,7 @@ public class Worker implements Runnable {
|
||||||
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
|
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
|
||||||
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
||||||
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||||
shardPrioritization, Optional.empty(), Optional.empty(), new NoOpWorkerStateChangeListener());
|
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -886,22 +887,6 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A listener for callbacks on changes worker state
|
|
||||||
*/
|
|
||||||
@FunctionalInterface
|
|
||||||
public interface WorkerStateChangeListener {
|
|
||||||
enum WorkerState {
|
|
||||||
CREATED,
|
|
||||||
INITIALIZING,
|
|
||||||
STARTED,
|
|
||||||
SHUTTING_DOWN,
|
|
||||||
SHUT_DOWN
|
|
||||||
}
|
|
||||||
|
|
||||||
void onWorkerStateChange(WorkerState newState);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
|
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
|
||||||
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
|
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
|
||||||
|
|
@ -1345,8 +1330,8 @@ public class Worker implements Runnable {
|
||||||
kinesisProxy = new KinesisProxy(config, kinesisClient);
|
kinesisProxy = new KinesisProxy(config, kinesisClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (workerStateChangeListener == null){
|
if (workerStateChangeListener == null) {
|
||||||
workerStateChangeListener = new NoOpWorkerStateChangeListener();
|
workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Worker(config.getApplicationName(),
|
return new Worker(config.getApplicationName(),
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A listener for callbacks on changes worker state
|
||||||
|
*/
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface WorkerStateChangeListener {
|
||||||
|
enum WorkerState {
|
||||||
|
CREATED,
|
||||||
|
INITIALIZING,
|
||||||
|
STARTED,
|
||||||
|
SHUTTING_DOWN,
|
||||||
|
SHUT_DOWN
|
||||||
|
}
|
||||||
|
|
||||||
|
void onWorkerStateChange(WorkerState newState);
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue