Merge remote-tracking branch 'upstream/master' into immutable_client_fix

This commit is contained in:
Sahil Palvia 2018-03-02 13:39:54 -08:00
commit 5f7cbdc02f
10 changed files with 291 additions and 27 deletions

6
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View file

@ -0,0 +1,6 @@
*Issue #, if available:*
*Description of changes:*
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

4
CODE_OF_CONDUCT.md Normal file
View file

@ -0,0 +1,4 @@
## Code of Conduct
This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
opensource-codeofconduct@amazon.com with any additional questions or comments.

61
CONTRIBUTING.md Normal file
View file

@ -0,0 +1,61 @@
# Contributing Guidelines
Thank you for your interest in contributing to our project. Whether it's a bug report, new feature, correction, or additional
documentation, we greatly value feedback and contributions from our community.
Please read through this document before submitting any issues or pull requests to ensure we have all the necessary
information to effectively respond to your bug report or contribution.
## Reporting Bugs/Feature Requests
We welcome you to use the GitHub issue tracker to report bugs or suggest features.
When filing an issue, please check [existing open](https://github.com/awslabs/amazon-kinesis-client/issues), or [recently closed](https://github.com/awslabs/amazon-kinesis-client/issues?utf8=%E2%9C%93&q=is%3Aissue%20is%3Aclosed%20), issues to make sure somebody else hasn't already
reported the issue. Please try to include as much information as you can. Details like these are incredibly useful:
* A reproducible test case or series of steps
* The version of our code being used
* Any modifications you've made relevant to the bug
* Anything unusual about your environment or deployment
## Contributing via Pull Requests
Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that:
1. You are working against the latest source on the *master* branch.
2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already.
3. You open an issue to discuss any significant work - we would hate for your time to be wasted.
To send us a pull request, please:
1. Fork the repository.
2. Modify the source; please focus on the specific change you are contributing. If you also reformat all the code, it will be hard for us to focus on your change.
3. Ensure local tests pass.
4. Commit to your fork using clear commit messages.
5. Send us a pull request, answering any default questions in the pull request interface.
6. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation.
GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and
[creating a pull request](https://help.github.com/articles/creating-a-pull-request/).
## Finding contributions to work on
Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels ((enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any ['help wanted'](https://github.com/awslabs/amazon-kinesis-client/labels/help%20wanted) issues is a great place to start.
## Code of Conduct
This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
opensource-codeofconduct@amazon.com with any additional questions or comments.
## Security issue notifications
If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue.
## Licensing
See the [LICENSE](https://github.com/awslabs/amazon-kinesis-client/blob/master/LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution.
We may ask you to sign a [Contributor License Agreement (CLA)](http://en.wikipedia.org/wiki/Contributor_License_Agreement) for larger changes.

View file

@ -0,0 +1,16 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener {
/**
* Empty constructor for NoOp Worker State Change Listener
*/
public NoOpWorkerStateChangeListener() {
}
@Override
public void onWorkerStateChange(WorkerState newState) {
}
}

View file

@ -70,12 +70,14 @@ public class SequenceNumberValidator {
*/
void validateSequenceNumber(String sequenceNumber)
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
if (!isDigits(sequenceNumber)) {
boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber);
if (!atShardEnd && !isDigits(sequenceNumber)) {
LOG.info("Sequence number must be numeric, but was " + sequenceNumber);
throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber);
}
try {
if (validateWithGetIterator) {
if (!atShardEnd &&validateWithGetIterator) {
proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber);
LOG.info("Validated sequence number " + sequenceNumber + " with shard id " + shardId);
}

View file

@ -81,6 +81,7 @@ public class Worker implements Runnable {
private static final Log LOG = LogFactory.getLog(Worker.class);
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private WorkerLog wlog = new WorkerLog();
@ -103,7 +104,6 @@ public class Worker implements Runnable {
private final Optional<Integer> retryGetRecordsInSeconds;
private final Optional<Integer> maxGetRecordsThreadPool;
// private final KinesisClientLeaseManager leaseManager;
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager controlServer;
@ -129,6 +129,8 @@ public class Worker implements Runnable {
@VisibleForTesting
protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
private WorkerStateChangeListener workerStateChangeListener;
/**
* Constructor.
*
@ -400,7 +402,8 @@ public class Worker implements Runnable {
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool());
config.getMaxGetRecordsThreadPool(),
DEFAULT_WORKER_STATE_CHANGE_LISTENER);
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
@ -460,7 +463,7 @@ public class Worker implements Runnable {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty());
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER);
}
/**
@ -507,7 +510,7 @@ public class Worker implements Runnable {
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
@ -529,6 +532,8 @@ public class Worker implements Runnable {
this.shardPrioritization = shardPrioritization;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
}
/**
@ -606,6 +611,7 @@ public class Worker implements Runnable {
}
private void initialize() {
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false;
Exception lastException = null;
@ -655,6 +661,7 @@ public class Worker implements Runnable {
if (!isDone) {
throw new RuntimeException(lastException);
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
}
/**
@ -867,6 +874,10 @@ public class Worker implements Runnable {
return shardInfoShardConsumerMap;
}
WorkerStateChangeListener getWorkerStateChangeListener() {
return workerStateChangeListener;
}
/**
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
* services were passed to the worker by the user, worker will not attempt to shutdown those resources.
@ -897,6 +908,7 @@ public class Worker implements Runnable {
// Lost leases will force Worker to begin shutdown process for all shard consumers in
// Worker.run().
leaseCoordinator.stop();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
}
/**
@ -1151,6 +1163,7 @@ public class Worker implements Runnable {
private ShardPrioritization shardPrioritization;
@Setter @Accessors(fluent = true)
private IKinesisProxy kinesisProxy;
private WorkerStateChangeListener workerStateChangeListener;
@VisibleForTesting
AmazonKinesis getKinesisClient() {
@ -1261,6 +1274,10 @@ public class Worker implements Runnable {
kinesisProxy = new KinesisProxy(config, kinesisClient);
}
if (workerStateChangeListener == null) {
workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER;
}
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
@ -1292,8 +1309,8 @@ public class Worker implements Runnable {
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
shardPrioritization,
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool());
config.getMaxGetRecordsThreadPool(),
workerStateChangeListener);
}
AmazonWebServiceClient createClient(@NonNull final AwsClientBuilder builder,

View file

@ -0,0 +1,16 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* A listener for callbacks on changes worker state
*/
@FunctionalInterface
public interface WorkerStateChangeListener {
enum WorkerState {
CREATED,
INITIALIZING,
STARTED,
SHUT_DOWN
}
void onWorkerStateChange(WorkerState newState);
}

View file

@ -168,6 +168,21 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
}
/**
* Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}.
*/
@Test
public final void testCheckpointAtShardEnd() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
processingCheckpointer.checkpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
}
/**
* Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint()}.
@ -299,6 +314,30 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
/**
* Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}.
*/
@Test
public final void testPrepareCheckpointAtShardEnd() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber());
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
preparedCheckpoint.checkpoint();
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
/**
* Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order.
*/

View file

@ -87,7 +87,7 @@ public class SequenceNumberValidatorTest {
boolean validateWithGetIterator) {
String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(),
SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(),
SentinelCheckpoint.TRIM_HORIZON.toString(),
SentinelCheckpoint.AT_TIMESTAMP.toString() };
for (String nonNumericString : nonNumericStrings) {

View file

@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
@ -100,6 +101,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
@ -181,6 +183,8 @@ public class WorkerTest {
private Future<TaskResult> taskFuture;
@Mock
private TaskResult taskResult;
@Mock
private WorkerStateChangeListener workerStateChangeListener;
@Before
public void setup() {
@ -1511,6 +1515,105 @@ public class WorkerTest {
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
}
@Test
public void testBuilderForWorkerStateListener() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.build();
Assert.assertTrue(worker.getWorkerStateChangeListener() instanceof NoOpWorkerStateChangeListener);
}
@Test
public void testBuilderWhenWorkerStateListenerIsSet() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.workerStateChangeListener(workerStateChangeListener)
.config(config)
.build();
Assert.assertSame(workerStateChangeListener, worker.getWorkerStateChangeListener());
}
@Test
public void testWorkerStateListenerStatePassesThroughCreatedState() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.workerStateChangeListener(workerStateChangeListener)
.config(config)
.build();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED));
}
@Test
public void testWorkerStateChangeListenerGoesThroughStates() throws Exception {
final CountDownLatch workerInitialized = new CountDownLatch(1);
final CountDownLatch workerStarted = new CountDownLatch(1);
final IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final IRecordProcessor processor = mock(IRecordProcessor.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
workerInitialized.countDown();
return true;
}
}).when(leaseManager).waitUntilLeaseTableExists(anyLong(), anyLong());
doAnswer(new Answer<IRecordProcessor>() {
@Override
public IRecordProcessor answer(InvocationOnMock invocation) throws Throwable {
workerStarted.countDown();
return processor;
}
}).when(recordProcessorFactory).createProcessor();
when(config.getWorkerIdentifier()).thenReturn("Self");
when(leaseManager.listLeases()).thenReturn(leases);
when(leaseManager.renewLease(leases.get(0))).thenReturn(true);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
when(taskResult.isShardEndReached()).thenReturn(true);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.leaseManager(leaseManager)
.kinesisProxy(kinesisProxy)
.execService(executorService)
.workerStateChangeListener(workerStateChangeListener)
.build();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED));
WorkerThread workerThread = new WorkerThread(worker);
workerThread.start();
workerInitialized.await();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.INITIALIZING));
workerStarted.await();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.STARTED));
boolean workerShutdown = worker.createGracefulShutdownCallable()
.call();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.SHUT_DOWN));
}
@Test
public void testBuilderWithDefaultLeaseManager() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);