From dbbcae9db6f49b2abf4129af530fe5be23dfac78 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Wed, 14 Mar 2018 10:58:07 -0700 Subject: [PATCH] Broke out the lifecycle-ish classes --- .../kinesis/multilang/MessageWriter.java | 2 +- .../kinesis/multilang/MultiLangProtocol.java | 2 +- .../multilang/messages/ShutdownMessage.java | 2 +- .../kinesis/multilang/MessageWriterTest.java | 2 +- .../multilang/MultiLangProtocolTest.java | 2 +- .../StreamingRecordProcessorTest.java | 2 +- .../multilang/messages/MessageTest.java | 2 +- .../clientlibrary/lib/worker/ITask.java | 38 -------------- .../worker/KinesisClientLibConfiguration.java | 4 +- .../MetricsCollectingTaskDecorator.java | 5 +- .../worker/RecordProcessorCheckpointer.java | 20 ++++---- .../lib/worker/SequenceNumberValidator.java | 2 +- .../clientlibrary/lib/worker/ShardInfo.java | 2 +- .../lib/worker/ShardSyncTask.java | 3 ++ .../lib/worker/ShardSyncTaskManager.java | 2 + .../clientlibrary/lib/worker/ShardSyncer.java | 4 +- .../lib/worker/StreamConfig.java | 26 +++++----- .../lib/worker/ThrottlingReporter.java | 6 +-- .../clientlibrary/lib/worker/Worker.java | 3 ++ .../lifecycle}/BlockOnParentShardTask.java | 23 +++++---- .../kinesis/lifecycle}/ConsumerStates.java | 2 +- .../amazon/kinesis/lifecycle/ITask.java | 41 +++++++++++++++ .../kinesis/lifecycle}/InitializeTask.java | 8 +-- .../kinesis/lifecycle}/ProcessTask.java | 9 ++-- .../kinesis/lifecycle}/ShardConsumer.java | 51 +++++++++++-------- .../kinesis/lifecycle/ShutdownInput.java | 1 - .../lifecycle}/ShutdownNotificationTask.java | 6 ++- .../kinesis/lifecycle}/ShutdownReason.java | 7 ++- .../kinesis/lifecycle}/ShutdownTask.java | 11 ++-- .../amazon/kinesis/lifecycle}/TaskResult.java | 26 +++++----- .../amazon/kinesis/lifecycle}/TaskType.java | 20 ++++---- .../kinesis/processor/IRecordProcessor.java | 2 +- .../processor/v2/IRecordProcessor.java | 3 +- .../GracefulShutdownCoordinatorTest.java | 1 + .../lib/worker/ShardSequenceVerifier.java | 1 + .../lib/worker/TestStreamlet.java | 7 +-- .../clientlibrary/lib/worker/WorkerTest.java | 9 ++++ .../types/ShutdownReasonTest.java | 2 +- .../BlockOnParentShardTaskTest.java | 24 +++++---- .../lifecycle}/ConsumerStatesTest.java | 23 +++++++-- .../kinesis/lifecycle}/ProcessTaskTest.java | 14 ++++- .../kinesis/lifecycle}/ShardConsumerTest.java | 17 ++++++- .../kinesis/lifecycle}/ShutdownTaskTest.java | 11 +++- 43 files changed, 274 insertions(+), 174 deletions(-) delete mode 100644 amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ITask.java rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/BlockOnParentShardTask.java (84%) rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ConsumerStates.java (99%) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ITask.java rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/InitializeTask.java (94%) rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ProcessTask.java (97%) rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ShardConsumer.java (91%) rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ShutdownNotificationTask.java (89%) rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ShutdownReason.java (90%) rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ShutdownTask.java (93%) rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/TaskResult.java (67%) rename amazon-kinesis-client/src/main/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/TaskType.java (57%) rename amazon-kinesis-client/src/test/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/BlockOnParentShardTaskTest.java (90%) rename amazon-kinesis-client/src/test/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ConsumerStatesTest.java (94%) rename amazon-kinesis-client/src/test/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ProcessTaskTest.java (95%) rename amazon-kinesis-client/src/test/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ShardConsumerTest.java (97%) rename amazon-kinesis-client/src/test/java/{com/amazonaws/services/kinesis/clientlibrary/lib/worker => software/amazon/kinesis/lifecycle}/ShutdownTaskTest.java (89%) diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java index 5d6be701..4604957f 100644 --- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java +++ b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java @@ -22,7 +22,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index b777cb71..42867c79 100644 --- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -23,7 +23,7 @@ import java.util.concurrent.TimeoutException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java index 82ed5458..082229e1 100644 --- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java +++ b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java @@ -14,7 +14,7 @@ */ package com.amazonaws.services.kinesis.multilang.messages; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownReason; /** * A message to indicate to the client's process that it should shutdown and then terminate. diff --git a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java index fa1a841d..bf6a2c37 100644 --- a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java @@ -30,7 +30,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index 9c725753..fb6ce3af 100644 --- a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -20,7 +20,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.Record; diff --git a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 99aea7ac..57031e1b 100644 --- a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -21,7 +21,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingExcepti import software.amazon.kinesis.processor.IPreparedCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ShutdownInput; diff --git a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java index 69db6761..e8414b1a 100644 --- a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java @@ -22,7 +22,7 @@ import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import org.junit.Assert; import org.junit.Test; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ITask.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ITask.java deleted file mode 100644 index d19166a1..00000000 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ITask.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import java.util.concurrent.Callable; - -/** - * Interface for shard processing tasks. - * A task may execute an application callback (e.g. initialize, process, shutdown). - */ -interface ITask extends Callable { - - /** - * Perform task logic. - * E.g. perform set up (e.g. fetch records) and invoke a callback (e.g. processRecords() API). - * - * @return TaskResult (captures any exceptions encountered during execution of the task) - */ - TaskResult call(); - - /** - * @return TaskType - */ - TaskType getTaskType(); - -} diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 5044e0e0..072c0e4f 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -24,6 +24,8 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.RegionUtils; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; +import software.amazon.kinesis.lifecycle.ProcessTask; +import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.IMetricsScope; import software.amazon.kinesis.metrics.MetricsLevel; @@ -998,7 +1000,7 @@ public class KinesisClientLibConfiguration { * Controls how long the KCL will sleep if no records are returned from Kinesis * *

- * This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will + * This value is only used when no records are returned; if records are returned, the {@link ProcessTask} will * immediately retrieve the next set of records after the call to * {@link IRecordProcessor#processRecords(ProcessRecordsInput)} * has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java index 68fff081..9a171e55 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java @@ -14,6 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import software.amazon.kinesis.lifecycle.ITask; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.IMetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; @@ -21,7 +24,7 @@ import software.amazon.kinesis.metrics.MetricsLevel; /** * Decorates an ITask and reports metrics about its timing and success/failure. */ -class MetricsCollectingTaskDecorator implements ITask { +public class MetricsCollectingTaskDecorator implements ITask { private final ITask other; private final IMetricsFactory factory; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index 6e8640ef..4df3e0ff 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -37,7 +37,7 @@ import lombok.extern.slf4j.Slf4j; * RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment. */ @Slf4j -class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { +public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { private ICheckpoint checkpoint; private ExtendedSequenceNumber largestPermittedCheckpointValue; @@ -59,10 +59,10 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { * @param checkpoint Used to checkpoint progress of a RecordProcessor * @param validator Used for validating sequence numbers */ - RecordProcessorCheckpointer(ShardInfo shardInfo, - ICheckpoint checkpoint, - SequenceNumberValidator validator, - IMetricsFactory metricsFactory) { + public RecordProcessorCheckpointer(ShardInfo shardInfo, + ICheckpoint checkpoint, + SequenceNumberValidator validator, + IMetricsFactory metricsFactory) { this.shardInfo = shardInfo; this.checkpoint = checkpoint; this.sequenceNumberValidator = validator; @@ -227,11 +227,11 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { /** * @return the lastCheckpointValue */ - ExtendedSequenceNumber getLastCheckpointValue() { + public ExtendedSequenceNumber getLastCheckpointValue() { return lastCheckpointValue; } - synchronized void setInitialCheckpointValue(ExtendedSequenceNumber initialCheckpoint) { + public synchronized void setInitialCheckpointValue(ExtendedSequenceNumber initialCheckpoint) { lastCheckpointValue = initialCheckpoint; } @@ -240,14 +240,14 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { * * @return the largest permitted checkpoint */ - synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() { + public synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() { return largestPermittedCheckpointValue; } /** * @param largestPermittedCheckpointValue the largest permitted checkpoint */ - synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { + public synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { this.largestPermittedCheckpointValue = largestPermittedCheckpointValue; } @@ -258,7 +258,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { * * @param extendedSequenceNumber */ - synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) { + public synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) { this.sequenceNumberAtShardEnd = extendedSequenceNumber; } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java index d6d24f67..6e680696 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java @@ -48,7 +48,7 @@ public class SequenceNumberValidator { * @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers * being validated */ - SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { + public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { this.proxy = proxy; this.shardId = shardId; this.validateWithGetIterator = validateWithGetIterator; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java index cd742b13..e15b920a 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java @@ -86,7 +86,7 @@ public class ShardInfo { * * @return a list of shardId's that are parents of this shard, or empty if the shard has no parents. */ - protected List getParentShardIds() { + public List getParentShardIds() { return new LinkedList(parentShardIds); } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java index a8bb926b..e8de2aa5 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java @@ -14,6 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import software.amazon.kinesis.lifecycle.ITask; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.ILeaseManager; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index a79444b2..ccdb02ee 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -19,6 +19,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import software.amazon.kinesis.lifecycle.ITask; +import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.ILeaseManager; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index b992b55d..899830b3 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -49,7 +49,7 @@ import lombok.extern.slf4j.Slf4j; * and begun processing it's child shards. */ @Slf4j -class ShardSyncer { +public class ShardSyncer { /** * Note constructor is private: We use static synchronized methods - this is a utility class. @@ -80,7 +80,7 @@ class ShardSyncer { * @throws ProvisionedThroughputException * @throws KinesisClientLibIOException */ - static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, + public static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java index 45db90bc..615d7dc9 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java @@ -19,7 +19,7 @@ import software.amazon.kinesis.retrieval.IKinesisProxy; /** * Used to capture stream configuration and pass it along. */ -class StreamConfig { +public class StreamConfig { private final IKinesisProxy streamProxy; private final int maxRecords; @@ -37,12 +37,12 @@ class StreamConfig { * @param validateSequenceNumberBeforeCheckpointing Whether to call Amazon Kinesis to validate sequence numbers * @param initialPositionInStream Initial position in stream */ - StreamConfig(IKinesisProxy proxy, - int maxRecords, - long idleTimeInMilliseconds, - boolean callProcessRecordsEvenForEmptyRecordList, - boolean validateSequenceNumberBeforeCheckpointing, - InitialPositionInStreamExtended initialPositionInStream) { + public StreamConfig(IKinesisProxy proxy, + int maxRecords, + long idleTimeInMilliseconds, + boolean callProcessRecordsEvenForEmptyRecordList, + boolean validateSequenceNumberBeforeCheckpointing, + InitialPositionInStreamExtended initialPositionInStream) { this.streamProxy = proxy; this.maxRecords = maxRecords; this.idleTimeInMilliseconds = idleTimeInMilliseconds; @@ -54,42 +54,42 @@ class StreamConfig { /** * @return the streamProxy */ - IKinesisProxy getStreamProxy() { + public IKinesisProxy getStreamProxy() { return streamProxy; } /** * @return the maxRecords */ - int getMaxRecords() { + public int getMaxRecords() { return maxRecords; } /** * @return the idleTimeInMilliseconds */ - long getIdleTimeInMilliseconds() { + public long getIdleTimeInMilliseconds() { return idleTimeInMilliseconds; } /** * @return the callProcessRecordsEvenForEmptyRecordList */ - boolean shouldCallProcessRecordsEvenForEmptyRecordList() { + public boolean shouldCallProcessRecordsEvenForEmptyRecordList() { return callProcessRecordsEvenForEmptyRecordList; } /** * @return the initialPositionInStream */ - InitialPositionInStreamExtended getInitialPositionInStream() { + public InitialPositionInStreamExtended getInitialPositionInStream() { return initialPositionInStream; } /** * @return validateSequenceNumberBeforeCheckpointing */ - boolean shouldValidateSequenceNumberBeforeCheckpointing() { + public boolean shouldValidateSequenceNumberBeforeCheckpointing() { return validateSequenceNumberBeforeCheckpointing; } } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java index a9845439..0753e005 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java @@ -21,14 +21,14 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor @Slf4j -class ThrottlingReporter { +public class ThrottlingReporter { private final int maxConsecutiveWarnThrottles; private final String shardId; private int consecutiveThrottles = 0; - void throttled() { + public void throttled() { consecutiveThrottles++; String message = "Shard '" + shardId + "' has been throttled " + consecutiveThrottles + " consecutively"; @@ -41,7 +41,7 @@ class ThrottlingReporter { } - void success() { + public void success() { consecutiveThrottles = 0; } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 440ec46e..dbd999d3 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -40,6 +40,9 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; +import software.amazon.kinesis.lifecycle.ShardConsumer; +import software.amazon.kinesis.lifecycle.ShutdownReason; +import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java similarity index 84% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index c4ad765a..56ed6699 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -1,20 +1,21 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.ILeaseManager; @@ -30,7 +31,7 @@ import lombok.extern.slf4j.Slf4j; * proceed with processing data from the shard. */ @Slf4j -class BlockOnParentShardTask implements ITask { +public class BlockOnParentShardTask implements ITask { private final ShardInfo shardInfo; private final ILeaseManager leaseManager; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java similarity index 99% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index c0bdc060..e192a505 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; /** * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ITask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ITask.java new file mode 100644 index 00000000..ed58de83 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ITask.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.lifecycle.TaskType; + +import java.util.concurrent.Callable; + +/** + * Interface for shard processing tasks. + * A task may execute an application callback (e.g. initialize, process, shutdown). + */ +public interface ITask extends Callable { + + /** + * Perform task logic. + * E.g. perform set up (e.g. fetch records) and invoke a callback (e.g. processRecords() API). + * + * @return TaskResult (captures any exceptions encountered during execution of the task) + */ + TaskResult call(); + + /** + * @return TaskType + */ + TaskType getTaskType(); + +} diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java similarity index 94% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index 66f9e3b2..8bb45739 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -12,15 +12,17 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.MetricsLevel; @@ -30,7 +32,7 @@ import lombok.extern.slf4j.Slf4j; * Task for initializing shard position and invoking the RecordProcessor initialize() API. */ @Slf4j -class InitializeTask implements ITask { +public class InitializeTask implements ITask { private static final String RECORD_PROCESSOR_INITIALIZE_METRIC = "RecordProcessor.initialize"; private final ShardInfo shardInfo; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java similarity index 97% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 95ade7e3..2661b831 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -12,20 +12,23 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; import java.math.BigInteger; import java.util.List; import java.util.ListIterator; import com.amazonaws.services.cloudwatch.model.StandardUnit; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ThrottlingReporter; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.IKinesisProxyExtended; import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.retrieval.kpl.UserRecord; import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.IMetricsScope; @@ -41,7 +44,7 @@ import lombok.extern.slf4j.Slf4j; * Task for fetching data records and invoking processRecords() on the record processor instance. */ @Slf4j -class ProcessTask implements ITask { +public class ProcessTask implements ITask { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java similarity index 91% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 4a46264a..f629783f 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; import java.util.Optional; @@ -21,6 +21,13 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.leases.KinesisClientLease; @@ -42,7 +49,7 @@ import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy; * A new instance should be created if the primary responsibility is reassigned back to this process. */ @Slf4j -class ShardConsumer { +public class ShardConsumer { private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; private final KinesisClientLibConfiguration config; @@ -145,20 +152,20 @@ class ShardConsumer { * @param config Kinesis library configuration */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - ILeaseManager leaseManager, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config) { + public ShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config) { this( shardInfo, @@ -246,7 +253,7 @@ class ShardConsumer { * * @return true if a new process task was submitted, false otherwise */ - synchronized boolean consumeShard() { + public synchronized boolean consumeShard() { return checkAndSubmitNextTask(); } @@ -345,7 +352,7 @@ class ShardConsumer { * * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. */ - void notifyShutdownRequested(ShutdownNotification shutdownNotification) { + public void notifyShutdownRequested(ShutdownNotification shutdownNotification) { this.shutdownNotification = shutdownNotification; markForShutdown(ShutdownReason.REQUESTED); } @@ -356,7 +363,7 @@ class ShardConsumer { * * @return true if shutdown is complete (false if shutdown is still in progress) */ - synchronized boolean beginShutdown() { + public synchronized boolean beginShutdown() { markForShutdown(ShutdownReason.ZOMBIE); checkAndSubmitNextTask(); @@ -376,14 +383,14 @@ class ShardConsumer { * * @return true if shutdown is complete */ - boolean isShutdown() { + public boolean isShutdown() { return currentState.isTerminal(); } /** * @return the shutdownReason */ - ShutdownReason getShutdownReason() { + public ShutdownReason getShutdownReason() { return shutdownReason; } @@ -430,7 +437,7 @@ class ShardConsumer { } @VisibleForTesting - boolean isShutdownRequested() { + public boolean isShutdownRequested() { return shutdownReason != null; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownInput.java index 464f2dd0..41cefdbd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownInput.java @@ -15,7 +15,6 @@ package software.amazon.kinesis.lifecycle; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.processor.v2.IRecordProcessor; /** diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java similarity index 89% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java index 198c6fa6..7379687a 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java @@ -12,8 +12,10 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; @@ -21,7 +23,7 @@ import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; /** * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint. */ -class ShutdownNotificationTask implements ITask { +public class ShutdownNotificationTask implements ITask { private final IRecordProcessor recordProcessor; private final IRecordProcessorCheckpointer recordProcessorCheckpointer; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownReason.java similarity index 90% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownReason.java index 8b6a93a4..0381ebab 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownReason.java @@ -12,13 +12,12 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; -import software.amazon.kinesis.lifecycle.ShutdownInput; import software.amazon.kinesis.processor.v2.IRecordProcessor; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static software.amazon.kinesis.lifecycle.ConsumerStates.ConsumerState; +import static software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState; /** diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java similarity index 93% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 205caa18..ed7d6956 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -12,13 +12,16 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.kinesis.lifecycle.ShutdownInput; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.metrics.MetricsHelper; @@ -31,7 +34,7 @@ import lombok.extern.slf4j.Slf4j; * Task for invoking the RecordProcessor shutdown() callback. */ @Slf4j -class ShutdownTask implements ITask { +public class ShutdownTask implements ITask { private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; private final ShardInfo shardInfo; @@ -163,7 +166,7 @@ class ShutdownTask implements ITask { } @VisibleForTesting - ShutdownReason getReason() { + public ShutdownReason getReason() { return reason; } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskResult.java similarity index 67% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskResult.java index cede1167..8762f07d 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskResult.java @@ -1,24 +1,24 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; /** * Used to capture information from a task that we want to communicate back to the higher layer. * E.g. exception thrown when executing the task, if we reach end of a shard. */ -class TaskResult { +public class TaskResult { // Did we reach the end of the shard while processing this task. private boolean shardEndReached; @@ -29,7 +29,7 @@ class TaskResult { /** * @return the shardEndReached */ - protected boolean isShardEndReached() { + public boolean isShardEndReached() { return shardEndReached; } @@ -50,7 +50,7 @@ class TaskResult { /** * @param e Any exception encountered when running the process task. */ - TaskResult(Exception e) { + public TaskResult(Exception e) { this(e, false); } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskType.java similarity index 57% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskType.java index 32fd1cd2..3e568210 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskType.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; /** * Enumerates types of tasks executed as part of processing a shard. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessor.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessor.java index 19f27680..a74941f1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessor.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessor.java @@ -17,7 +17,7 @@ package software.amazon.kinesis.processor; import java.util.List; import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownReason; /** * The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessor.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessor.java index fd3bbca5..ecb1ad36 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessor.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessor.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.processor.v2; import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ShutdownInput; +import software.amazon.kinesis.lifecycle.ShutdownReason; /** * The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon @@ -50,7 +51,7 @@ public interface IRecordProcessor { *

Warning

* * When the value of {@link ShutdownInput#getShutdownReason()} is - * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you + * {@link ShutdownReason#TERMINATE} it is required that you * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. * * @param shutdownInput diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java index c032bf0c..daee9980 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java +++ b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java @@ -36,6 +36,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.verification.VerificationMode; +import software.amazon.kinesis.lifecycle.ShardConsumer; @RunWith(MockitoJUnitRunner.class) public class GracefulShutdownCoordinatorTest { diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java index 7e24aff0..4176575c 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java +++ b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java @@ -25,6 +25,7 @@ import com.amazonaws.services.kinesis.model.Shard; import junit.framework.Assert; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.lifecycle.ShutdownReason; /** * Helper class to verify shard lineage in unit tests that use TestStreamlet. diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java index 560289fa..60f60ed7 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java +++ b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java @@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; @@ -40,7 +41,7 @@ import lombok.extern.slf4j.Slf4j; * Streamlet that tracks records it's seen - useful for testing. */ @Slf4j -class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware { +public class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware { private List records = new ArrayList(); private Set processedSeqNums = new HashSet(); // used for deduping @@ -143,7 +144,7 @@ class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware { /** * @return the shutdownReason */ - ShutdownReason getShutdownReason() { + public ShutdownReason getShutdownReason() { return shutdownReason; } @@ -154,7 +155,7 @@ class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware { return numProcessRecordsCallsWithEmptyRecordList; } - boolean isShutdownNotificationCalled() { + public boolean isShutdownNotificationCalled() { return shutdownNotificationCalled; } diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index c52eb7aa..7cf1d7b0 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -82,6 +82,15 @@ import org.mockito.stubbing.Answer; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; +import software.amazon.kinesis.lifecycle.BlockOnParentShardTask; +import software.amazon.kinesis.lifecycle.ITask; +import software.amazon.kinesis.lifecycle.InitializeTask; +import software.amazon.kinesis.lifecycle.ShardConsumer; +import software.amazon.kinesis.lifecycle.ShutdownNotificationTask; +import software.amazon.kinesis.lifecycle.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownTask; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.v2.IRecordProcessor; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java index 0b9a72f1..5444588b 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java +++ b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java @@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.types; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownReason; import org.junit.Test; /** diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java similarity index 90% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java index ccca4017..fb2bbf7f 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -20,6 +20,7 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -27,6 +28,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import software.amazon.kinesis.lifecycle.BlockOnParentShardTask; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java similarity index 94% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 2abf6601..988569d8 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -12,10 +12,10 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static software.amazon.kinesis.lifecycle.ConsumerStates.ConsumerState; +import static software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -28,6 +28,12 @@ import java.lang.reflect.Field; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; import org.hamcrest.Condition; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -38,6 +44,17 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.lifecycle.BlockOnParentShardTask; +import software.amazon.kinesis.lifecycle.ConsumerStates; +import software.amazon.kinesis.lifecycle.ITask; +import software.amazon.kinesis.lifecycle.InitializeTask; +import software.amazon.kinesis.lifecycle.ProcessTask; +import software.amazon.kinesis.lifecycle.ShardConsumer; +import software.amazon.kinesis.lifecycle.ShutdownNotificationTask; +import software.amazon.kinesis.lifecycle.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownTask; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.v2.IRecordProcessor; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java similarity index 95% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index e6ed92ac..e3ff7e46 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -37,12 +37,21 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ThrottlingReporter; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import software.amazon.kinesis.lifecycle.ProcessTask; +import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.KinesisDataFetcher; @@ -75,7 +84,8 @@ public class ProcessTaskTest { private @Mock KinesisDataFetcher mockDataFetcher; private @Mock IRecordProcessor mockRecordProcessor; - private @Mock RecordProcessorCheckpointer mockCheckpointer; + private @Mock + RecordProcessorCheckpointer mockCheckpointer; @Mock private ThrottlingReporter throttlingReporter; @Mock diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java similarity index 97% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 11f7085e..2c8ab0f1 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -50,6 +50,15 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TestStreamlet; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -59,6 +68,12 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.lifecycle.ConsumerStates; +import software.amazon.kinesis.lifecycle.ITask; +import software.amazon.kinesis.lifecycle.InitializeTask; +import software.amazon.kinesis.lifecycle.ShardConsumer; +import software.amazon.kinesis.lifecycle.ShutdownReason; +import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.checkpoint.Checkpoint; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java similarity index 89% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index d84488de..315a6f56 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.lifecycle; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -22,6 +22,11 @@ import static org.mockito.Mockito.when; import java.util.HashSet; import java.util.Set; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TestStreamlet; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -30,6 +35,10 @@ import org.junit.BeforeClass; import org.junit.Test; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; +import software.amazon.kinesis.lifecycle.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownTask; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy;