From 9e3399405ba6ee03f4ab9da67bbef62e18bff831 Mon Sep 17 00:00:00 2001 From: Henri Yandell Date: Mon, 26 Feb 2018 10:31:55 -0800 Subject: [PATCH 1/3] Adding standard files (#302) * Create PULL_REQUEST_TEMPLATE.md * Create CONTRIBUTING.md * Create CODE_OF_CONDUCT.md * Update CONTRIBUTING.md --- .github/PULL_REQUEST_TEMPLATE.md | 6 ++++ CODE_OF_CONDUCT.md | 4 +++ CONTRIBUTING.md | 61 ++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+) create mode 100644 .github/PULL_REQUEST_TEMPLATE.md create mode 100644 CODE_OF_CONDUCT.md create mode 100644 CONTRIBUTING.md diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 00000000..6bdaa999 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,6 @@ +*Issue #, if available:* + +*Description of changes:* + + +By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 00000000..3b644668 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,4 @@ +## Code of Conduct +This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). +For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact +opensource-codeofconduct@amazon.com with any additional questions or comments. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..ed9eb3e2 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,61 @@ +# Contributing Guidelines + +Thank you for your interest in contributing to our project. Whether it's a bug report, new feature, correction, or additional +documentation, we greatly value feedback and contributions from our community. + +Please read through this document before submitting any issues or pull requests to ensure we have all the necessary +information to effectively respond to your bug report or contribution. + + +## Reporting Bugs/Feature Requests + +We welcome you to use the GitHub issue tracker to report bugs or suggest features. + +When filing an issue, please check [existing open](https://github.com/awslabs/amazon-kinesis-client/issues), or [recently closed](https://github.com/awslabs/amazon-kinesis-client/issues?utf8=%E2%9C%93&q=is%3Aissue%20is%3Aclosed%20), issues to make sure somebody else hasn't already +reported the issue. Please try to include as much information as you can. Details like these are incredibly useful: + +* A reproducible test case or series of steps +* The version of our code being used +* Any modifications you've made relevant to the bug +* Anything unusual about your environment or deployment + + +## Contributing via Pull Requests +Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that: + +1. You are working against the latest source on the *master* branch. +2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already. +3. You open an issue to discuss any significant work - we would hate for your time to be wasted. + +To send us a pull request, please: + +1. Fork the repository. +2. Modify the source; please focus on the specific change you are contributing. If you also reformat all the code, it will be hard for us to focus on your change. +3. Ensure local tests pass. +4. Commit to your fork using clear commit messages. +5. Send us a pull request, answering any default questions in the pull request interface. +6. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation. + +GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and +[creating a pull request](https://help.github.com/articles/creating-a-pull-request/). + + +## Finding contributions to work on +Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels ((enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any ['help wanted'](https://github.com/awslabs/amazon-kinesis-client/labels/help%20wanted) issues is a great place to start. + + +## Code of Conduct +This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). +For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact +opensource-codeofconduct@amazon.com with any additional questions or comments. + + +## Security issue notifications +If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue. + + +## Licensing + +See the [LICENSE](https://github.com/awslabs/amazon-kinesis-client/blob/master/LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution. + +We may ask you to sign a [Contributor License Agreement (CLA)](http://en.wikipedia.org/wiki/Contributor_License_Agreement) for larger changes. From 24916ba552ae6834ed8bc71af56ce268f0287309 Mon Sep 17 00:00:00 2001 From: nyo Date: Tue, 27 Feb 2018 15:54:16 +0100 Subject: [PATCH 2/3] Created listener for worker state change (#291) * Created listener for worker state change #275 --- .../worker/NoOpWorkerStateChangeListener.java | 16 +++ .../clientlibrary/lib/worker/Worker.java | 57 +++++--- .../lib/worker/WorkerStateChangeListener.java | 16 +++ .../clientlibrary/lib/worker/WorkerTest.java | 127 ++++++++++++++++-- 4 files changed, 189 insertions(+), 27 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerStateChangeListener.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java new file mode 100644 index 00000000..152a43af --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java @@ -0,0 +1,16 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener { + + /** + * Empty constructor for NoOp Worker State Change Listener + */ + public NoOpWorkerStateChangeListener() { + + } + + @Override + public void onWorkerStateChange(WorkerState newState) { + + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index daf89c28..644f4225 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -71,6 +71,7 @@ public class Worker implements Runnable { private static final Log LOG = LogFactory.getLog(Worker.class); private static final int MAX_INITIALIZATION_ATTEMPTS = 20; + private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private WorkerLog wlog = new WorkerLog(); @@ -93,7 +94,6 @@ public class Worker implements Runnable { private final Optional retryGetRecordsInSeconds; private final Optional maxGetRecordsThreadPool; - // private final KinesisClientLeaseManager leaseManager; private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager controlServer; @@ -119,6 +119,8 @@ public class Worker implements Runnable { @VisibleForTesting protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); + private WorkerStateChangeListener workerStateChangeListener; + /** * Constructor. * @@ -276,7 +278,8 @@ public class Worker implements Runnable { config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy(), config.getRetryGetRecordsInSeconds(), - config.getMaxGetRecordsThreadPool()); + config.getMaxGetRecordsThreadPool(), + DEFAULT_WORKER_STATE_CHANGE_LISTENER); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -348,7 +351,7 @@ public class Worker implements Runnable { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - shardPrioritization, Optional.empty(), Optional.empty()); + shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER); } /** @@ -395,7 +398,7 @@ public class Worker implements Runnable { KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -417,6 +420,8 @@ public class Worker implements Runnable { this.shardPrioritization = shardPrioritization; this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; + this.workerStateChangeListener = workerStateChangeListener; + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); } /** @@ -494,6 +499,7 @@ public class Worker implements Runnable { } private void initialize() { + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; Exception lastException = null; @@ -543,6 +549,7 @@ public class Worker implements Runnable { if (!isDone) { throw new RuntimeException(lastException); } + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); } /** @@ -593,10 +600,10 @@ public class Worker implements Runnable { /** * Starts the requestedShutdown process, and returns a future that can be used to track the process. - * + * * This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and * indicates the process behavior - * + * * @return a future that will be set once shutdown is completed. */ @Deprecated @@ -640,7 +647,7 @@ public class Worker implements Runnable { * Requests a graceful shutdown of the worker, notifying record processors, that implement * {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to * checkpoint. - * + * * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the * previous future. * @@ -755,6 +762,10 @@ public class Worker implements Runnable { return shardInfoShardConsumerMap; } + WorkerStateChangeListener getWorkerStateChangeListener() { + return workerStateChangeListener; + } + /** * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor * services were passed to the worker by the user, worker will not attempt to shutdown those resources. @@ -785,6 +796,7 @@ public class Worker implements Runnable { // Lost leases will force Worker to begin shutdown process for all shard consumers in // Worker.run(). leaseCoordinator.stop(); + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } /** @@ -807,7 +819,7 @@ public class Worker implements Runnable { /** * Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()} * method before every loop run, so method must do minimum amount of work to not impact shard processing timings. - * + * * @return Whether worker should shutdown immediately. */ @VisibleForTesting @@ -1012,7 +1024,7 @@ public class Worker implements Runnable { /** * Given configuration, returns appropriate metrics factory. - * + * * @param cloudWatchClient * Amazon CloudWatch client * @param config @@ -1039,7 +1051,7 @@ public class Worker implements Runnable { /** * Returns default executor service that should be used by the worker. - * + * * @return Default executor service that should be used by the worker. */ private static ExecutorService getExecutorService() { @@ -1089,6 +1101,7 @@ public class Worker implements Runnable { private ExecutorService execService; private ShardPrioritization shardPrioritization; private IKinesisProxy kinesisProxy; + private WorkerStateChangeListener workerStateChangeListener; /** * Default constructor. @@ -1209,10 +1222,10 @@ public class Worker implements Runnable { /** * Provides logic how to prioritize shard processing. - * + * * @param shardPrioritization * shardPrioritization is responsible to order shards before processing - * + * * @return A reference to this updated object so that method calls can be chained together. */ public Builder shardPrioritization(ShardPrioritization shardPrioritization) { @@ -1233,6 +1246,17 @@ public class Worker implements Runnable { return this; } + /** + * Set WorkerStateChangeListener for the worker + * @param workerStateChangeListener + * Sets the WorkerStateChangeListener + * @return A reference to this updated object so that method calls can be chained together. + */ + public Builder workerStateChangeListener(WorkerStateChangeListener workerStateChangeListener) { + this.workerStateChangeListener = workerStateChangeListener; + return this; + } + /** * Build the Worker instance. * @@ -1305,6 +1329,10 @@ public class Worker implements Runnable { kinesisProxy = new KinesisProxy(config, kinesisClient); } + if (workerStateChangeListener == null) { + workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER; + } + return new Worker(config.getApplicationName(), recordProcessorFactory, config, @@ -1336,9 +1364,8 @@ public class Worker implements Runnable { config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization, config.getRetryGetRecordsInSeconds(), - config.getMaxGetRecordsThreadPool()); - + config.getMaxGetRecordsThreadPool(), + workerStateChangeListener); } - } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerStateChangeListener.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerStateChangeListener.java new file mode 100644 index 00000000..36ee39f0 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerStateChangeListener.java @@ -0,0 +1,16 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * A listener for callbacks on changes worker state + */ +@FunctionalInterface +public interface WorkerStateChangeListener { + enum WorkerState { + CREATED, + INITIALIZING, + STARTED, + SHUT_DOWN + } + + void onWorkerStateChange(WorkerState newState); +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 037a54b2..21aaa8ac 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; @@ -89,6 +90,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; @@ -142,7 +144,7 @@ public class WorkerTest { private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; - + private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; @@ -170,7 +172,9 @@ public class WorkerTest { private Future taskFuture; @Mock private TaskResult taskResult; - + @Mock + private WorkerStateChangeListener workerStateChangeListener; + @Before public void setup() { config = spy(new KinesisClientLibConfiguration("app", null, null, null)); @@ -179,7 +183,7 @@ public class WorkerTest { } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES - private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = + private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() { @Override @@ -212,8 +216,8 @@ public class WorkerTest { }; } }; - - private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = + + private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY); @@ -619,7 +623,7 @@ public class WorkerTest { return null; } }).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class)); - + RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); @@ -659,7 +663,7 @@ public class WorkerTest { * This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of * {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads. * This behavior makes the test a bit racy, since we need to ensure a specific order of events. - * + * * @throws Exception */ @Test @@ -1356,7 +1360,7 @@ public class WorkerTest { executorService, metricsFactory, taskBackoffTimeMillis, - failoverTimeMillis, + failoverTimeMillis, false, shardPrioritization); @@ -1432,7 +1436,7 @@ public class WorkerTest { config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, - parentShardPollIntervalMillis, + parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, @@ -1500,6 +1504,105 @@ public class WorkerTest { Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy); } + @Test + public void testBuilderForWorkerStateListener() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .build(); + Assert.assertTrue(worker.getWorkerStateChangeListener() instanceof NoOpWorkerStateChangeListener); + } + + @Test + public void testBuilderWhenWorkerStateListenerIsSet() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .workerStateChangeListener(workerStateChangeListener) + .config(config) + .build(); + Assert.assertSame(workerStateChangeListener, worker.getWorkerStateChangeListener()); + } + + @Test + public void testWorkerStateListenerStatePassesThroughCreatedState() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .workerStateChangeListener(workerStateChangeListener) + .config(config) + .build(); + + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED)); + } + + @Test + public void testWorkerStateChangeListenerGoesThroughStates() throws Exception { + + final CountDownLatch workerInitialized = new CountDownLatch(1); + final CountDownLatch workerStarted = new CountDownLatch(1); + final IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final IRecordProcessor processor = mock(IRecordProcessor.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + final List leases = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + + doAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + workerInitialized.countDown(); + return true; + } + }).when(leaseManager).waitUntilLeaseTableExists(anyLong(), anyLong()); + doAnswer(new Answer() { + @Override + public IRecordProcessor answer(InvocationOnMock invocation) throws Throwable { + workerStarted.countDown(); + return processor; + } + }).when(recordProcessorFactory).createProcessor(); + + when(config.getWorkerIdentifier()).thenReturn("Self"); + when(leaseManager.listLeases()).thenReturn(leases); + when(leaseManager.renewLease(leases.get(0))).thenReturn(true); + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + when(taskResult.isShardEndReached()).thenReturn(true); + + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .leaseManager(leaseManager) + .kinesisProxy(kinesisProxy) + .execService(executorService) + .workerStateChangeListener(workerStateChangeListener) + .build(); + + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED)); + + WorkerThread workerThread = new WorkerThread(worker); + workerThread.start(); + + workerInitialized.await(); + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.INITIALIZING)); + + workerStarted.await(); + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.STARTED)); + + boolean workerShutdown = worker.createGracefulShutdownCallable() + .call(); + + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.SHUT_DOWN)); + } + @Test public void testBuilderWithDefaultLeaseManager() { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); @@ -1801,7 +1904,7 @@ public class WorkerTest { TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier); ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); - + WorkerThread workerThread = runWorker( shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis, numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig); @@ -1857,7 +1960,7 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - + Worker worker = new Worker(stageName, recordProcessorFactory, @@ -1874,7 +1977,7 @@ public class WorkerTest { failoverTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); - + WorkerThread workerThread = new WorkerThread(worker); workerThread.start(); return workerThread; From 523cc0e2cc70bf2f7fdcf472391d67ef42203b3e Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Tue, 27 Feb 2018 08:49:20 -0800 Subject: [PATCH 3/3] Fix preparing a checkpoint at SHARD_END (#301) Fix IllegalArgumentException: Sequence number must be numeric, when preparing a checkpoint at SHARD_END --- .../lib/worker/SequenceNumberValidator.java | 6 ++- .../RecordProcessorCheckpointerTest.java | 39 +++++++++++++++++++ .../worker/SequenceNumberValidatorTest.java | 2 +- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java index 96af5f7c..8cebbf33 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java @@ -70,12 +70,14 @@ public class SequenceNumberValidator { */ void validateSequenceNumber(String sequenceNumber) throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException { - if (!isDigits(sequenceNumber)) { + boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber); + + if (!atShardEnd && !isDigits(sequenceNumber)) { LOG.info("Sequence number must be numeric, but was " + sequenceNumber); throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber); } try { - if (validateWithGetIterator) { + if (!atShardEnd &&validateWithGetIterator) { proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber); LOG.info("Validated sequence number " + sequenceNumber + " with shard id " + shardId); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java index a3153aec..67c36d20 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java @@ -168,6 +168,21 @@ public class RecordProcessorCheckpointerTest { Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); } + /** + * Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}. + */ + @Test + public final void testCheckpointAtShardEnd() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END; + processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); + processingCheckpointer.checkpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + } + + /** * Test method for * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint()}. @@ -299,6 +314,30 @@ public class RecordProcessorCheckpointerTest { Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); } + /** + * Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}. + */ + @Test + public final void testPrepareCheckpointAtShardEnd() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END; + processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); + IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber()); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // Checkpoint using preparedCheckpoint + preparedCheckpoint.checkpoint(); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + } + + /** * Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order. */ diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java index aae93f29..51d1376d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java @@ -87,7 +87,7 @@ public class SequenceNumberValidatorTest { boolean validateWithGetIterator) { String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(), - SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(), + SentinelCheckpoint.TRIM_HORIZON.toString(), SentinelCheckpoint.AT_TIMESTAMP.toString() }; for (String nonNumericString : nonNumericStrings) {