Created listener for worker state change (#291)
* Created listener for worker state change #275
This commit is contained in:
parent
9e3399405b
commit
24916ba552
4 changed files with 189 additions and 27 deletions
|
|
@ -0,0 +1,16 @@
|
||||||
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty constructor for NoOp Worker State Change Listener
|
||||||
|
*/
|
||||||
|
public NoOpWorkerStateChangeListener() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onWorkerStateChange(WorkerState newState) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -71,6 +71,7 @@ public class Worker implements Runnable {
|
||||||
private static final Log LOG = LogFactory.getLog(Worker.class);
|
private static final Log LOG = LogFactory.getLog(Worker.class);
|
||||||
|
|
||||||
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
||||||
|
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
||||||
|
|
||||||
private WorkerLog wlog = new WorkerLog();
|
private WorkerLog wlog = new WorkerLog();
|
||||||
|
|
||||||
|
|
@ -93,7 +94,6 @@ public class Worker implements Runnable {
|
||||||
private final Optional<Integer> retryGetRecordsInSeconds;
|
private final Optional<Integer> retryGetRecordsInSeconds;
|
||||||
private final Optional<Integer> maxGetRecordsThreadPool;
|
private final Optional<Integer> maxGetRecordsThreadPool;
|
||||||
|
|
||||||
// private final KinesisClientLeaseManager leaseManager;
|
|
||||||
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
|
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||||
private final ShardSyncTaskManager controlServer;
|
private final ShardSyncTaskManager controlServer;
|
||||||
|
|
||||||
|
|
@ -119,6 +119,8 @@ public class Worker implements Runnable {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
|
protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
|
||||||
|
|
||||||
|
private WorkerStateChangeListener workerStateChangeListener;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*
|
*
|
||||||
|
|
@ -276,7 +278,8 @@ public class Worker implements Runnable {
|
||||||
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
||||||
config.getShardPrioritizationStrategy(),
|
config.getShardPrioritizationStrategy(),
|
||||||
config.getRetryGetRecordsInSeconds(),
|
config.getRetryGetRecordsInSeconds(),
|
||||||
config.getMaxGetRecordsThreadPool());
|
config.getMaxGetRecordsThreadPool(),
|
||||||
|
DEFAULT_WORKER_STATE_CHANGE_LISTENER);
|
||||||
|
|
||||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||||
if (config.getRegionName() != null) {
|
if (config.getRegionName() != null) {
|
||||||
|
|
@ -348,7 +351,7 @@ public class Worker implements Runnable {
|
||||||
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
|
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
|
||||||
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
||||||
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||||
shardPrioritization, Optional.empty(), Optional.empty());
|
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -395,7 +398,7 @@ public class Worker implements Runnable {
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
||||||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
|
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) {
|
||||||
this.applicationName = applicationName;
|
this.applicationName = applicationName;
|
||||||
this.recordProcessorFactory = recordProcessorFactory;
|
this.recordProcessorFactory = recordProcessorFactory;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
|
@ -417,6 +420,8 @@ public class Worker implements Runnable {
|
||||||
this.shardPrioritization = shardPrioritization;
|
this.shardPrioritization = shardPrioritization;
|
||||||
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
|
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
|
||||||
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
||||||
|
this.workerStateChangeListener = workerStateChangeListener;
|
||||||
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -494,6 +499,7 @@ public class Worker implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initialize() {
|
private void initialize() {
|
||||||
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
|
||||||
boolean isDone = false;
|
boolean isDone = false;
|
||||||
Exception lastException = null;
|
Exception lastException = null;
|
||||||
|
|
||||||
|
|
@ -543,6 +549,7 @@ public class Worker implements Runnable {
|
||||||
if (!isDone) {
|
if (!isDone) {
|
||||||
throw new RuntimeException(lastException);
|
throw new RuntimeException(lastException);
|
||||||
}
|
}
|
||||||
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -593,10 +600,10 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts the requestedShutdown process, and returns a future that can be used to track the process.
|
* Starts the requestedShutdown process, and returns a future that can be used to track the process.
|
||||||
*
|
*
|
||||||
* This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and
|
* This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and
|
||||||
* indicates the process behavior
|
* indicates the process behavior
|
||||||
*
|
*
|
||||||
* @return a future that will be set once shutdown is completed.
|
* @return a future that will be set once shutdown is completed.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
|
@ -640,7 +647,7 @@ public class Worker implements Runnable {
|
||||||
* Requests a graceful shutdown of the worker, notifying record processors, that implement
|
* Requests a graceful shutdown of the worker, notifying record processors, that implement
|
||||||
* {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
|
* {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
|
||||||
* checkpoint.
|
* checkpoint.
|
||||||
*
|
*
|
||||||
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
|
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
|
||||||
* previous future.
|
* previous future.
|
||||||
*
|
*
|
||||||
|
|
@ -755,6 +762,10 @@ public class Worker implements Runnable {
|
||||||
return shardInfoShardConsumerMap;
|
return shardInfoShardConsumerMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
WorkerStateChangeListener getWorkerStateChangeListener() {
|
||||||
|
return workerStateChangeListener;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
|
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
|
||||||
* services were passed to the worker by the user, worker will not attempt to shutdown those resources.
|
* services were passed to the worker by the user, worker will not attempt to shutdown those resources.
|
||||||
|
|
@ -785,6 +796,7 @@ public class Worker implements Runnable {
|
||||||
// Lost leases will force Worker to begin shutdown process for all shard consumers in
|
// Lost leases will force Worker to begin shutdown process for all shard consumers in
|
||||||
// Worker.run().
|
// Worker.run().
|
||||||
leaseCoordinator.stop();
|
leaseCoordinator.stop();
|
||||||
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -807,7 +819,7 @@ public class Worker implements Runnable {
|
||||||
/**
|
/**
|
||||||
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
|
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
|
||||||
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
||||||
*
|
*
|
||||||
* @return Whether worker should shutdown immediately.
|
* @return Whether worker should shutdown immediately.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
@ -1012,7 +1024,7 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given configuration, returns appropriate metrics factory.
|
* Given configuration, returns appropriate metrics factory.
|
||||||
*
|
*
|
||||||
* @param cloudWatchClient
|
* @param cloudWatchClient
|
||||||
* Amazon CloudWatch client
|
* Amazon CloudWatch client
|
||||||
* @param config
|
* @param config
|
||||||
|
|
@ -1039,7 +1051,7 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns default executor service that should be used by the worker.
|
* Returns default executor service that should be used by the worker.
|
||||||
*
|
*
|
||||||
* @return Default executor service that should be used by the worker.
|
* @return Default executor service that should be used by the worker.
|
||||||
*/
|
*/
|
||||||
private static ExecutorService getExecutorService() {
|
private static ExecutorService getExecutorService() {
|
||||||
|
|
@ -1089,6 +1101,7 @@ public class Worker implements Runnable {
|
||||||
private ExecutorService execService;
|
private ExecutorService execService;
|
||||||
private ShardPrioritization shardPrioritization;
|
private ShardPrioritization shardPrioritization;
|
||||||
private IKinesisProxy kinesisProxy;
|
private IKinesisProxy kinesisProxy;
|
||||||
|
private WorkerStateChangeListener workerStateChangeListener;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default constructor.
|
* Default constructor.
|
||||||
|
|
@ -1209,10 +1222,10 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides logic how to prioritize shard processing.
|
* Provides logic how to prioritize shard processing.
|
||||||
*
|
*
|
||||||
* @param shardPrioritization
|
* @param shardPrioritization
|
||||||
* shardPrioritization is responsible to order shards before processing
|
* shardPrioritization is responsible to order shards before processing
|
||||||
*
|
*
|
||||||
* @return A reference to this updated object so that method calls can be chained together.
|
* @return A reference to this updated object so that method calls can be chained together.
|
||||||
*/
|
*/
|
||||||
public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
|
public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
|
||||||
|
|
@ -1233,6 +1246,17 @@ public class Worker implements Runnable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set WorkerStateChangeListener for the worker
|
||||||
|
* @param workerStateChangeListener
|
||||||
|
* Sets the WorkerStateChangeListener
|
||||||
|
* @return A reference to this updated object so that method calls can be chained together.
|
||||||
|
*/
|
||||||
|
public Builder workerStateChangeListener(WorkerStateChangeListener workerStateChangeListener) {
|
||||||
|
this.workerStateChangeListener = workerStateChangeListener;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build the Worker instance.
|
* Build the Worker instance.
|
||||||
*
|
*
|
||||||
|
|
@ -1305,6 +1329,10 @@ public class Worker implements Runnable {
|
||||||
kinesisProxy = new KinesisProxy(config, kinesisClient);
|
kinesisProxy = new KinesisProxy(config, kinesisClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (workerStateChangeListener == null) {
|
||||||
|
workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER;
|
||||||
|
}
|
||||||
|
|
||||||
return new Worker(config.getApplicationName(),
|
return new Worker(config.getApplicationName(),
|
||||||
recordProcessorFactory,
|
recordProcessorFactory,
|
||||||
config,
|
config,
|
||||||
|
|
@ -1336,9 +1364,8 @@ public class Worker implements Runnable {
|
||||||
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
||||||
shardPrioritization,
|
shardPrioritization,
|
||||||
config.getRetryGetRecordsInSeconds(),
|
config.getRetryGetRecordsInSeconds(),
|
||||||
config.getMaxGetRecordsThreadPool());
|
config.getMaxGetRecordsThreadPool(),
|
||||||
|
workerStateChangeListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,16 @@
|
||||||
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A listener for callbacks on changes worker state
|
||||||
|
*/
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface WorkerStateChangeListener {
|
||||||
|
enum WorkerState {
|
||||||
|
CREATED,
|
||||||
|
INITIALIZING,
|
||||||
|
STARTED,
|
||||||
|
SHUT_DOWN
|
||||||
|
}
|
||||||
|
|
||||||
|
void onWorkerStateChange(WorkerState newState);
|
||||||
|
}
|
||||||
|
|
@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
|
|
@ -89,6 +90,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory;
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
|
||||||
|
|
@ -142,7 +144,7 @@ public class WorkerTest {
|
||||||
|
|
||||||
private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d";
|
private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d";
|
||||||
private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d";
|
private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d";
|
||||||
|
|
||||||
private RecordsFetcherFactory recordsFetcherFactory;
|
private RecordsFetcherFactory recordsFetcherFactory;
|
||||||
private KinesisClientLibConfiguration config;
|
private KinesisClientLibConfiguration config;
|
||||||
|
|
||||||
|
|
@ -170,7 +172,9 @@ public class WorkerTest {
|
||||||
private Future<TaskResult> taskFuture;
|
private Future<TaskResult> taskFuture;
|
||||||
@Mock
|
@Mock
|
||||||
private TaskResult taskResult;
|
private TaskResult taskResult;
|
||||||
|
@Mock
|
||||||
|
private WorkerStateChangeListener workerStateChangeListener;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
config = spy(new KinesisClientLibConfiguration("app", null, null, null));
|
config = spy(new KinesisClientLibConfiguration("app", null, null, null));
|
||||||
|
|
@ -179,7 +183,7 @@ public class WorkerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
|
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
|
||||||
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
|
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
|
||||||
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
|
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -212,8 +216,8 @@ public class WorkerTest {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
|
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
|
||||||
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
|
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -619,7 +623,7 @@ public class WorkerTest {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class));
|
}).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class));
|
||||||
|
|
||||||
RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class);
|
RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class);
|
||||||
GetRecordsCache getRecordsCache = mock(GetRecordsCache.class);
|
GetRecordsCache getRecordsCache = mock(GetRecordsCache.class);
|
||||||
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
|
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
|
||||||
|
|
@ -659,7 +663,7 @@ public class WorkerTest {
|
||||||
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
|
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
|
||||||
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
|
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
|
||||||
* This behavior makes the test a bit racy, since we need to ensure a specific order of events.
|
* This behavior makes the test a bit racy, since we need to ensure a specific order of events.
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -1356,7 +1360,7 @@ public class WorkerTest {
|
||||||
executorService,
|
executorService,
|
||||||
metricsFactory,
|
metricsFactory,
|
||||||
taskBackoffTimeMillis,
|
taskBackoffTimeMillis,
|
||||||
failoverTimeMillis,
|
failoverTimeMillis,
|
||||||
false,
|
false,
|
||||||
shardPrioritization);
|
shardPrioritization);
|
||||||
|
|
||||||
|
|
@ -1432,7 +1436,7 @@ public class WorkerTest {
|
||||||
config,
|
config,
|
||||||
streamConfig,
|
streamConfig,
|
||||||
INITIAL_POSITION_TRIM_HORIZON,
|
INITIAL_POSITION_TRIM_HORIZON,
|
||||||
parentShardPollIntervalMillis,
|
parentShardPollIntervalMillis,
|
||||||
shardSyncIntervalMillis,
|
shardSyncIntervalMillis,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
leaseCoordinator,
|
leaseCoordinator,
|
||||||
|
|
@ -1500,6 +1504,105 @@ public class WorkerTest {
|
||||||
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
|
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuilderForWorkerStateListener() {
|
||||||
|
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||||
|
Worker worker = new Worker.Builder()
|
||||||
|
.recordProcessorFactory(recordProcessorFactory)
|
||||||
|
.config(config)
|
||||||
|
.build();
|
||||||
|
Assert.assertTrue(worker.getWorkerStateChangeListener() instanceof NoOpWorkerStateChangeListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuilderWhenWorkerStateListenerIsSet() {
|
||||||
|
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||||
|
Worker worker = new Worker.Builder()
|
||||||
|
.recordProcessorFactory(recordProcessorFactory)
|
||||||
|
.workerStateChangeListener(workerStateChangeListener)
|
||||||
|
.config(config)
|
||||||
|
.build();
|
||||||
|
Assert.assertSame(workerStateChangeListener, worker.getWorkerStateChangeListener());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWorkerStateListenerStatePassesThroughCreatedState() {
|
||||||
|
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||||
|
new Worker.Builder()
|
||||||
|
.recordProcessorFactory(recordProcessorFactory)
|
||||||
|
.workerStateChangeListener(workerStateChangeListener)
|
||||||
|
.config(config)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWorkerStateChangeListenerGoesThroughStates() throws Exception {
|
||||||
|
|
||||||
|
final CountDownLatch workerInitialized = new CountDownLatch(1);
|
||||||
|
final CountDownLatch workerStarted = new CountDownLatch(1);
|
||||||
|
final IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||||
|
final IRecordProcessor processor = mock(IRecordProcessor.class);
|
||||||
|
|
||||||
|
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
|
||||||
|
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
|
||||||
|
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
|
||||||
|
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
|
||||||
|
final List<KinesisClientLease> leases = new ArrayList<>();
|
||||||
|
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
|
||||||
|
leases.add(lease);
|
||||||
|
|
||||||
|
doAnswer(new Answer<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
workerInitialized.countDown();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}).when(leaseManager).waitUntilLeaseTableExists(anyLong(), anyLong());
|
||||||
|
doAnswer(new Answer<IRecordProcessor>() {
|
||||||
|
@Override
|
||||||
|
public IRecordProcessor answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
workerStarted.countDown();
|
||||||
|
return processor;
|
||||||
|
}
|
||||||
|
}).when(recordProcessorFactory).createProcessor();
|
||||||
|
|
||||||
|
when(config.getWorkerIdentifier()).thenReturn("Self");
|
||||||
|
when(leaseManager.listLeases()).thenReturn(leases);
|
||||||
|
when(leaseManager.renewLease(leases.get(0))).thenReturn(true);
|
||||||
|
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||||
|
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||||
|
when(taskFuture.isDone()).thenReturn(true);
|
||||||
|
when(taskFuture.get()).thenReturn(taskResult);
|
||||||
|
when(taskResult.isShardEndReached()).thenReturn(true);
|
||||||
|
|
||||||
|
Worker worker = new Worker.Builder()
|
||||||
|
.recordProcessorFactory(recordProcessorFactory)
|
||||||
|
.config(config)
|
||||||
|
.leaseManager(leaseManager)
|
||||||
|
.kinesisProxy(kinesisProxy)
|
||||||
|
.execService(executorService)
|
||||||
|
.workerStateChangeListener(workerStateChangeListener)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED));
|
||||||
|
|
||||||
|
WorkerThread workerThread = new WorkerThread(worker);
|
||||||
|
workerThread.start();
|
||||||
|
|
||||||
|
workerInitialized.await();
|
||||||
|
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.INITIALIZING));
|
||||||
|
|
||||||
|
workerStarted.await();
|
||||||
|
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.STARTED));
|
||||||
|
|
||||||
|
boolean workerShutdown = worker.createGracefulShutdownCallable()
|
||||||
|
.call();
|
||||||
|
|
||||||
|
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.SHUT_DOWN));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBuilderWithDefaultLeaseManager() {
|
public void testBuilderWithDefaultLeaseManager() {
|
||||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||||
|
|
@ -1801,7 +1904,7 @@ public class WorkerTest {
|
||||||
TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier);
|
TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier);
|
||||||
|
|
||||||
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
||||||
|
|
||||||
WorkerThread workerThread = runWorker(
|
WorkerThread workerThread = runWorker(
|
||||||
shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis,
|
shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis,
|
||||||
numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig);
|
numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig);
|
||||||
|
|
@ -1857,7 +1960,7 @@ public class WorkerTest {
|
||||||
idleTimeInMilliseconds,
|
idleTimeInMilliseconds,
|
||||||
callProcessRecordsForEmptyRecordList,
|
callProcessRecordsForEmptyRecordList,
|
||||||
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
|
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
|
||||||
|
|
||||||
Worker worker =
|
Worker worker =
|
||||||
new Worker(stageName,
|
new Worker(stageName,
|
||||||
recordProcessorFactory,
|
recordProcessorFactory,
|
||||||
|
|
@ -1874,7 +1977,7 @@ public class WorkerTest {
|
||||||
failoverTimeMillis,
|
failoverTimeMillis,
|
||||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
||||||
shardPrioritization);
|
shardPrioritization);
|
||||||
|
|
||||||
WorkerThread workerThread = new WorkerThread(worker);
|
WorkerThread workerThread = new WorkerThread(worker);
|
||||||
workerThread.start();
|
workerThread.start();
|
||||||
return workerThread;
|
return workerThread;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue