Merge remote-tracking branch 'upstream/master' into annotation-usage
This commit is contained in:
commit
06aeaf77aa
10 changed files with 291 additions and 27 deletions
6
.github/PULL_REQUEST_TEMPLATE.md
vendored
Normal file
6
.github/PULL_REQUEST_TEMPLATE.md
vendored
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
*Issue #, if available:*
|
||||
|
||||
*Description of changes:*
|
||||
|
||||
|
||||
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
|
||||
4
CODE_OF_CONDUCT.md
Normal file
4
CODE_OF_CONDUCT.md
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
## Code of Conduct
|
||||
This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
|
||||
For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
|
||||
opensource-codeofconduct@amazon.com with any additional questions or comments.
|
||||
61
CONTRIBUTING.md
Normal file
61
CONTRIBUTING.md
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
# Contributing Guidelines
|
||||
|
||||
Thank you for your interest in contributing to our project. Whether it's a bug report, new feature, correction, or additional
|
||||
documentation, we greatly value feedback and contributions from our community.
|
||||
|
||||
Please read through this document before submitting any issues or pull requests to ensure we have all the necessary
|
||||
information to effectively respond to your bug report or contribution.
|
||||
|
||||
|
||||
## Reporting Bugs/Feature Requests
|
||||
|
||||
We welcome you to use the GitHub issue tracker to report bugs or suggest features.
|
||||
|
||||
When filing an issue, please check [existing open](https://github.com/awslabs/amazon-kinesis-client/issues), or [recently closed](https://github.com/awslabs/amazon-kinesis-client/issues?utf8=%E2%9C%93&q=is%3Aissue%20is%3Aclosed%20), issues to make sure somebody else hasn't already
|
||||
reported the issue. Please try to include as much information as you can. Details like these are incredibly useful:
|
||||
|
||||
* A reproducible test case or series of steps
|
||||
* The version of our code being used
|
||||
* Any modifications you've made relevant to the bug
|
||||
* Anything unusual about your environment or deployment
|
||||
|
||||
|
||||
## Contributing via Pull Requests
|
||||
Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that:
|
||||
|
||||
1. You are working against the latest source on the *master* branch.
|
||||
2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already.
|
||||
3. You open an issue to discuss any significant work - we would hate for your time to be wasted.
|
||||
|
||||
To send us a pull request, please:
|
||||
|
||||
1. Fork the repository.
|
||||
2. Modify the source; please focus on the specific change you are contributing. If you also reformat all the code, it will be hard for us to focus on your change.
|
||||
3. Ensure local tests pass.
|
||||
4. Commit to your fork using clear commit messages.
|
||||
5. Send us a pull request, answering any default questions in the pull request interface.
|
||||
6. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation.
|
||||
|
||||
GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and
|
||||
[creating a pull request](https://help.github.com/articles/creating-a-pull-request/).
|
||||
|
||||
|
||||
## Finding contributions to work on
|
||||
Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels ((enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any ['help wanted'](https://github.com/awslabs/amazon-kinesis-client/labels/help%20wanted) issues is a great place to start.
|
||||
|
||||
|
||||
## Code of Conduct
|
||||
This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
|
||||
For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
|
||||
opensource-codeofconduct@amazon.com with any additional questions or comments.
|
||||
|
||||
|
||||
## Security issue notifications
|
||||
If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue.
|
||||
|
||||
|
||||
## Licensing
|
||||
|
||||
See the [LICENSE](https://github.com/awslabs/amazon-kinesis-client/blob/master/LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution.
|
||||
|
||||
We may ask you to sign a [Contributor License Agreement (CLA)](http://en.wikipedia.org/wiki/Contributor_License_Agreement) for larger changes.
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener {
|
||||
|
||||
/**
|
||||
* Empty constructor for NoOp Worker State Change Listener
|
||||
*/
|
||||
public NoOpWorkerStateChangeListener() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWorkerStateChange(WorkerState newState) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -70,12 +70,14 @@ public class SequenceNumberValidator {
|
|||
*/
|
||||
void validateSequenceNumber(String sequenceNumber)
|
||||
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
|
||||
if (!isDigits(sequenceNumber)) {
|
||||
boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber);
|
||||
|
||||
if (!atShardEnd && !isDigits(sequenceNumber)) {
|
||||
LOG.info("Sequence number must be numeric, but was " + sequenceNumber);
|
||||
throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber);
|
||||
}
|
||||
try {
|
||||
if (validateWithGetIterator) {
|
||||
if (!atShardEnd &&validateWithGetIterator) {
|
||||
proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber);
|
||||
LOG.info("Validated sequence number " + sequenceNumber + " with shard id " + shardId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -81,6 +81,7 @@ public class Worker implements Runnable {
|
|||
private static final Log LOG = LogFactory.getLog(Worker.class);
|
||||
|
||||
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
||||
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
||||
|
||||
private WorkerLog wlog = new WorkerLog();
|
||||
|
||||
|
|
@ -103,7 +104,6 @@ public class Worker implements Runnable {
|
|||
private final Optional<Integer> retryGetRecordsInSeconds;
|
||||
private final Optional<Integer> maxGetRecordsThreadPool;
|
||||
|
||||
// private final KinesisClientLeaseManager leaseManager;
|
||||
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||
private final ShardSyncTaskManager controlServer;
|
||||
|
||||
|
|
@ -129,6 +129,8 @@ public class Worker implements Runnable {
|
|||
@VisibleForTesting
|
||||
protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
|
||||
|
||||
private WorkerStateChangeListener workerStateChangeListener;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
|
|
@ -400,7 +402,8 @@ public class Worker implements Runnable {
|
|||
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
||||
config.getShardPrioritizationStrategy(),
|
||||
config.getRetryGetRecordsInSeconds(),
|
||||
config.getMaxGetRecordsThreadPool());
|
||||
config.getMaxGetRecordsThreadPool(),
|
||||
DEFAULT_WORKER_STATE_CHANGE_LISTENER);
|
||||
|
||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||
if (config.getRegionName() != null) {
|
||||
|
|
@ -460,7 +463,7 @@ public class Worker implements Runnable {
|
|||
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
|
||||
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
||||
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
shardPrioritization, Optional.empty(), Optional.empty());
|
||||
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -507,7 +510,7 @@ public class Worker implements Runnable {
|
|||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
||||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
|
||||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) {
|
||||
this.applicationName = applicationName;
|
||||
this.recordProcessorFactory = recordProcessorFactory;
|
||||
this.config = config;
|
||||
|
|
@ -529,6 +532,8 @@ public class Worker implements Runnable {
|
|||
this.shardPrioritization = shardPrioritization;
|
||||
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
|
||||
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
||||
this.workerStateChangeListener = workerStateChangeListener;
|
||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -606,6 +611,7 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
private void initialize() {
|
||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
|
||||
boolean isDone = false;
|
||||
Exception lastException = null;
|
||||
|
||||
|
|
@ -655,6 +661,7 @@ public class Worker implements Runnable {
|
|||
if (!isDone) {
|
||||
throw new RuntimeException(lastException);
|
||||
}
|
||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -705,10 +712,10 @@ public class Worker implements Runnable {
|
|||
|
||||
/**
|
||||
* Starts the requestedShutdown process, and returns a future that can be used to track the process.
|
||||
*
|
||||
*
|
||||
* This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and
|
||||
* indicates the process behavior
|
||||
*
|
||||
*
|
||||
* @return a future that will be set once shutdown is completed.
|
||||
*/
|
||||
@Deprecated
|
||||
|
|
@ -752,7 +759,7 @@ public class Worker implements Runnable {
|
|||
* Requests a graceful shutdown of the worker, notifying record processors, that implement
|
||||
* {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
|
||||
* checkpoint.
|
||||
*
|
||||
*
|
||||
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
|
||||
* previous future.
|
||||
*
|
||||
|
|
@ -867,6 +874,10 @@ public class Worker implements Runnable {
|
|||
return shardInfoShardConsumerMap;
|
||||
}
|
||||
|
||||
WorkerStateChangeListener getWorkerStateChangeListener() {
|
||||
return workerStateChangeListener;
|
||||
}
|
||||
|
||||
/**
|
||||
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
|
||||
* services were passed to the worker by the user, worker will not attempt to shutdown those resources.
|
||||
|
|
@ -897,6 +908,7 @@ public class Worker implements Runnable {
|
|||
// Lost leases will force Worker to begin shutdown process for all shard consumers in
|
||||
// Worker.run().
|
||||
leaseCoordinator.stop();
|
||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -919,7 +931,7 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
|
||||
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
||||
*
|
||||
*
|
||||
* @return Whether worker should shutdown immediately.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
|
|
@ -1050,7 +1062,7 @@ public class Worker implements Runnable {
|
|||
|
||||
/**
|
||||
* Given configuration, returns appropriate metrics factory.
|
||||
*
|
||||
*
|
||||
* @param cloudWatchClient
|
||||
* Amazon CloudWatch client
|
||||
* @param config
|
||||
|
|
@ -1075,7 +1087,7 @@ public class Worker implements Runnable {
|
|||
|
||||
/**
|
||||
* Returns default executor service that should be used by the worker.
|
||||
*
|
||||
*
|
||||
* @return Default executor service that should be used by the worker.
|
||||
*/
|
||||
private static ExecutorService getExecutorService() {
|
||||
|
|
@ -1151,6 +1163,7 @@ public class Worker implements Runnable {
|
|||
private ShardPrioritization shardPrioritization;
|
||||
@Setter @Accessors(fluent = true)
|
||||
private IKinesisProxy kinesisProxy;
|
||||
private WorkerStateChangeListener workerStateChangeListener;
|
||||
|
||||
@VisibleForTesting
|
||||
AmazonKinesis getKinesisClient() {
|
||||
|
|
@ -1261,6 +1274,10 @@ public class Worker implements Runnable {
|
|||
kinesisProxy = new KinesisProxy(config, kinesisClient);
|
||||
}
|
||||
|
||||
if (workerStateChangeListener == null) {
|
||||
workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER;
|
||||
}
|
||||
|
||||
return new Worker(config.getApplicationName(),
|
||||
recordProcessorFactory,
|
||||
config,
|
||||
|
|
@ -1292,8 +1309,8 @@ public class Worker implements Runnable {
|
|||
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
||||
shardPrioritization,
|
||||
config.getRetryGetRecordsInSeconds(),
|
||||
config.getMaxGetRecordsThreadPool());
|
||||
|
||||
config.getMaxGetRecordsThreadPool(),
|
||||
workerStateChangeListener);
|
||||
}
|
||||
|
||||
AmazonWebServiceClient createClient(@NonNull final AwsClientBuilder builder,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,16 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
/**
|
||||
* A listener for callbacks on changes worker state
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface WorkerStateChangeListener {
|
||||
enum WorkerState {
|
||||
CREATED,
|
||||
INITIALIZING,
|
||||
STARTED,
|
||||
SHUT_DOWN
|
||||
}
|
||||
|
||||
void onWorkerStateChange(WorkerState newState);
|
||||
}
|
||||
|
|
@ -168,6 +168,21 @@ public class RecordProcessorCheckpointerTest {
|
|||
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}.
|
||||
*/
|
||||
@Test
|
||||
public final void testCheckpointAtShardEnd() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
|
||||
processingCheckpointer.checkpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber());
|
||||
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test method for
|
||||
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint()}.
|
||||
|
|
@ -299,6 +314,30 @@ public class RecordProcessorCheckpointerTest {
|
|||
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}.
|
||||
*/
|
||||
@Test
|
||||
public final void testPrepareCheckpointAtShardEnd() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
|
||||
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber());
|
||||
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
|
||||
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
|
||||
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
|
||||
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
|
||||
|
||||
// Checkpoint using preparedCheckpoint
|
||||
preparedCheckpoint.checkpoint();
|
||||
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
|
||||
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
|
||||
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ public class SequenceNumberValidatorTest {
|
|||
boolean validateWithGetIterator) {
|
||||
|
||||
String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(),
|
||||
SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(),
|
||||
SentinelCheckpoint.TRIM_HORIZON.toString(),
|
||||
SentinelCheckpoint.AT_TIMESTAMP.toString() };
|
||||
|
||||
for (String nonNumericString : nonNumericStrings) {
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
|
|
@ -100,6 +101,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
|
|||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
|
||||
|
|
@ -153,7 +155,7 @@ public class WorkerTest {
|
|||
|
||||
private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d";
|
||||
private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d";
|
||||
|
||||
|
||||
private RecordsFetcherFactory recordsFetcherFactory;
|
||||
private KinesisClientLibConfiguration config;
|
||||
|
||||
|
|
@ -181,7 +183,9 @@ public class WorkerTest {
|
|||
private Future<TaskResult> taskFuture;
|
||||
@Mock
|
||||
private TaskResult taskResult;
|
||||
|
||||
@Mock
|
||||
private WorkerStateChangeListener workerStateChangeListener;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
config = spy(new KinesisClientLibConfiguration("app", null, null, null));
|
||||
|
|
@ -190,7 +194,7 @@ public class WorkerTest {
|
|||
}
|
||||
|
||||
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
|
||||
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
|
||||
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
|
||||
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
|
||||
|
||||
@Override
|
||||
|
|
@ -223,8 +227,8 @@ public class WorkerTest {
|
|||
};
|
||||
}
|
||||
};
|
||||
|
||||
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
|
||||
|
||||
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
|
||||
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
|
||||
|
||||
|
||||
|
|
@ -630,7 +634,7 @@ public class WorkerTest {
|
|||
return null;
|
||||
}
|
||||
}).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class));
|
||||
|
||||
|
||||
RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class);
|
||||
GetRecordsCache getRecordsCache = mock(GetRecordsCache.class);
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
|
||||
|
|
@ -670,7 +674,7 @@ public class WorkerTest {
|
|||
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
|
||||
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
|
||||
* This behavior makes the test a bit racy, since we need to ensure a specific order of events.
|
||||
*
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
|
|
@ -1367,7 +1371,7 @@ public class WorkerTest {
|
|||
executorService,
|
||||
metricsFactory,
|
||||
taskBackoffTimeMillis,
|
||||
failoverTimeMillis,
|
||||
failoverTimeMillis,
|
||||
false,
|
||||
shardPrioritization);
|
||||
|
||||
|
|
@ -1443,7 +1447,7 @@ public class WorkerTest {
|
|||
config,
|
||||
streamConfig,
|
||||
INITIAL_POSITION_TRIM_HORIZON,
|
||||
parentShardPollIntervalMillis,
|
||||
parentShardPollIntervalMillis,
|
||||
shardSyncIntervalMillis,
|
||||
cleanupLeasesUponShardCompletion,
|
||||
leaseCoordinator,
|
||||
|
|
@ -1511,6 +1515,105 @@ public class WorkerTest {
|
|||
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuilderForWorkerStateListener() {
|
||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||
Worker worker = new Worker.Builder()
|
||||
.recordProcessorFactory(recordProcessorFactory)
|
||||
.config(config)
|
||||
.build();
|
||||
Assert.assertTrue(worker.getWorkerStateChangeListener() instanceof NoOpWorkerStateChangeListener);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuilderWhenWorkerStateListenerIsSet() {
|
||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||
Worker worker = new Worker.Builder()
|
||||
.recordProcessorFactory(recordProcessorFactory)
|
||||
.workerStateChangeListener(workerStateChangeListener)
|
||||
.config(config)
|
||||
.build();
|
||||
Assert.assertSame(workerStateChangeListener, worker.getWorkerStateChangeListener());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWorkerStateListenerStatePassesThroughCreatedState() {
|
||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||
new Worker.Builder()
|
||||
.recordProcessorFactory(recordProcessorFactory)
|
||||
.workerStateChangeListener(workerStateChangeListener)
|
||||
.config(config)
|
||||
.build();
|
||||
|
||||
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWorkerStateChangeListenerGoesThroughStates() throws Exception {
|
||||
|
||||
final CountDownLatch workerInitialized = new CountDownLatch(1);
|
||||
final CountDownLatch workerStarted = new CountDownLatch(1);
|
||||
final IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||
final IRecordProcessor processor = mock(IRecordProcessor.class);
|
||||
|
||||
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
|
||||
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
|
||||
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
|
||||
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
|
||||
final List<KinesisClientLease> leases = new ArrayList<>();
|
||||
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
|
||||
leases.add(lease);
|
||||
|
||||
doAnswer(new Answer<Boolean>() {
|
||||
@Override
|
||||
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||
workerInitialized.countDown();
|
||||
return true;
|
||||
}
|
||||
}).when(leaseManager).waitUntilLeaseTableExists(anyLong(), anyLong());
|
||||
doAnswer(new Answer<IRecordProcessor>() {
|
||||
@Override
|
||||
public IRecordProcessor answer(InvocationOnMock invocation) throws Throwable {
|
||||
workerStarted.countDown();
|
||||
return processor;
|
||||
}
|
||||
}).when(recordProcessorFactory).createProcessor();
|
||||
|
||||
when(config.getWorkerIdentifier()).thenReturn("Self");
|
||||
when(leaseManager.listLeases()).thenReturn(leases);
|
||||
when(leaseManager.renewLease(leases.get(0))).thenReturn(true);
|
||||
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||
when(taskFuture.isDone()).thenReturn(true);
|
||||
when(taskFuture.get()).thenReturn(taskResult);
|
||||
when(taskResult.isShardEndReached()).thenReturn(true);
|
||||
|
||||
Worker worker = new Worker.Builder()
|
||||
.recordProcessorFactory(recordProcessorFactory)
|
||||
.config(config)
|
||||
.leaseManager(leaseManager)
|
||||
.kinesisProxy(kinesisProxy)
|
||||
.execService(executorService)
|
||||
.workerStateChangeListener(workerStateChangeListener)
|
||||
.build();
|
||||
|
||||
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED));
|
||||
|
||||
WorkerThread workerThread = new WorkerThread(worker);
|
||||
workerThread.start();
|
||||
|
||||
workerInitialized.await();
|
||||
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.INITIALIZING));
|
||||
|
||||
workerStarted.await();
|
||||
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.STARTED));
|
||||
|
||||
boolean workerShutdown = worker.createGracefulShutdownCallable()
|
||||
.call();
|
||||
|
||||
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.SHUT_DOWN));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuilderWithDefaultLeaseManager() {
|
||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||
|
|
@ -1921,7 +2024,7 @@ public class WorkerTest {
|
|||
TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier);
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
||||
|
||||
|
||||
WorkerThread workerThread = runWorker(
|
||||
shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis,
|
||||
numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig);
|
||||
|
|
@ -1977,7 +2080,7 @@ public class WorkerTest {
|
|||
idleTimeInMilliseconds,
|
||||
callProcessRecordsForEmptyRecordList,
|
||||
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
|
||||
|
||||
|
||||
Worker worker =
|
||||
new Worker(stageName,
|
||||
recordProcessorFactory,
|
||||
|
|
@ -1994,7 +2097,7 @@ public class WorkerTest {
|
|||
failoverTimeMillis,
|
||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
||||
shardPrioritization);
|
||||
|
||||
|
||||
WorkerThread workerThread = new WorkerThread(worker);
|
||||
workerThread.start();
|
||||
return workerThread;
|
||||
|
|
|
|||
Loading…
Reference in a new issue