From 07a03b0d0ee2c8cc7b6b3c18fb74377a41087a78 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 23 Mar 2018 12:59:35 -0700 Subject: [PATCH] Removing RecordProcessor V1, and replacing it wth v2 --- .../kinesis/multilang/MultiLangDaemon.java | 2 +- .../multilang/MultiLangRecordProcessor.java | 4 +- .../MultiLangRecordProcessorFactory.java | 4 +- .../StreamingRecordProcessorFactoryTest.java | 2 +- .../KinesisClientLibConfiguration.java | 2 +- .../amazon/kinesis/coordinator/Scheduler.java | 1 - .../amazon/kinesis/coordinator/Worker.java | 70 +++++------ .../lifecycle/InitializationInput.java | 2 +- .../kinesis/lifecycle/InitializeTask.java | 2 +- .../lifecycle/ProcessRecordsInput.java | 2 +- .../amazon/kinesis/lifecycle/ProcessTask.java | 2 +- .../kinesis/lifecycle/ShardConsumer.java | 2 +- .../ShardConsumerShutdownNotification.java | 2 +- .../kinesis/lifecycle/ShutdownInput.java | 2 +- .../lifecycle/ShutdownNotification.java | 3 +- .../lifecycle/ShutdownNotificationTask.java | 4 +- .../kinesis/lifecycle/ShutdownReason.java | 2 +- .../kinesis/lifecycle/ShutdownTask.java | 2 +- .../kinesis/processor/IRecordProcessor.java | 42 +++---- .../processor/IRecordProcessorFactory.java | 1 + .../{v2 => }/IShutdownNotificationAware.java | 2 +- .../kinesis/processor/ProcessorFactory.java | 2 - .../V1ToV2RecordProcessorAdapter.java | 51 -------- .../V1ToV2RecordProcessorFactoryAdapter.java | 38 ------ .../processor/v2/IRecordProcessor.java | 62 --------- .../processor/v2/IRecordProcessorFactory.java | 31 ----- .../kinesis/coordinator/WorkerTest.java | 118 +++++++++--------- .../kinesis/lifecycle/ConsumerStatesTest.java | 2 +- .../kinesis/lifecycle/ProcessTaskTest.java | 2 +- .../kinesis/lifecycle/ShardConsumerTest.java | 2 +- .../kinesis/lifecycle/ShutdownTaskTest.java | 2 +- .../amazon/kinesis/utils/TestStreamlet.java | 4 +- .../kinesis/utils/TestStreamletFactory.java | 4 +- 33 files changed, 137 insertions(+), 336 deletions(-) rename amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/{v2 => }/IShutdownNotificationAware.java (96%) delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorAdapter.java delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorFactoryAdapter.java delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessor.java delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessorFactory.java diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index d7591f24..87427d26 100644 --- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -23,7 +23,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; +import software.amazon.kinesis.processor.IRecordProcessorFactory; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.coordinator.Worker; diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 22717017..e8545e26 100644 --- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -21,8 +21,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; -import software.amazon.kinesis.processor.v2.IRecordProcessor; -import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; +import software.amazon.kinesis.processor.IRecordProcessor; +import software.amazon.kinesis.processor.IShutdownNotificationAware; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java index ec6c5b62..da423c3d 100644 --- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java +++ b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java @@ -16,8 +16,8 @@ package com.amazonaws.services.kinesis.multilang; import java.util.concurrent.ExecutorService; -import software.amazon.kinesis.processor.v2.IRecordProcessor; -import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; +import software.amazon.kinesis.processor.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessorFactory; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java index b565c4e7..3b7d447d 100644 --- a/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java @@ -18,7 +18,7 @@ import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import org.junit.Assert; import org.junit.Test; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java index 24963341..bc38e345 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java @@ -36,7 +36,7 @@ import software.amazon.kinesis.metrics.MetricsLevel; import com.google.common.collect.ImmutableSet; import lombok.Getter; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.DataFetchingStrategy; import software.amazon.kinesis.retrieval.KinesisProxy; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 8588d450..46d86192 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -50,7 +50,6 @@ import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ProcessorFactory; -import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.RetrievalConfig; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Worker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Worker.java index 8c147fcd..c532b70b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Worker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Worker.java @@ -41,39 +41,38 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; -import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; -import software.amazon.kinesis.leases.ParentsFirstShardPrioritization; -import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.leases.ShardPrioritization; -import software.amazon.kinesis.leases.ShardSyncTask; -import software.amazon.kinesis.leases.ShardSyncTaskManager; -import software.amazon.kinesis.lifecycle.ShardConsumer; -import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification; -import software.amazon.kinesis.lifecycle.ShutdownNotification; -import software.amazon.kinesis.lifecycle.ShutdownReason; -import software.amazon.kinesis.lifecycle.TaskResult; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; -import software.amazon.kinesis.processor.ICheckpoint; -import software.amazon.kinesis.processor.V1ToV2RecordProcessorFactoryAdapter; -import software.amazon.kinesis.processor.v2.IRecordProcessor; -import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; -import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; -import software.amazon.kinesis.retrieval.IKinesisProxy; -import software.amazon.kinesis.retrieval.KinesisProxy; -import software.amazon.kinesis.leases.exceptions.LeasingException; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.KinesisClientLeaseManager; -import software.amazon.kinesis.leases.ILeaseManager; -import software.amazon.kinesis.metrics.CWMetricsFactory; -import software.amazon.kinesis.metrics.NullMetricsFactory; -import software.amazon.kinesis.metrics.IMetricsFactory; -import software.amazon.kinesis.metrics.MetricsLevel; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Setter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.KinesisClientLease; +import software.amazon.kinesis.leases.KinesisClientLeaseManager; +import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; +import software.amazon.kinesis.leases.ParentsFirstShardPrioritization; +import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.ShardPrioritization; +import software.amazon.kinesis.leases.ShardSyncTask; +import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.leases.exceptions.LeasingException; +import software.amazon.kinesis.lifecycle.ShardConsumer; +import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification; +import software.amazon.kinesis.lifecycle.ShutdownNotification; +import software.amazon.kinesis.lifecycle.ShutdownReason; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.metrics.CWMetricsFactory; +import software.amazon.kinesis.metrics.IMetricsFactory; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; +import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.processor.ICheckpoint; +import software.amazon.kinesis.processor.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessorFactory; +import software.amazon.kinesis.processor.IShutdownNotificationAware; +import software.amazon.kinesis.retrieval.IKinesisProxy; +import software.amazon.kinesis.retrieval.KinesisProxy; /** * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees @@ -376,7 +375,8 @@ public class Worker implements Runnable { software.amazon.kinesis.processor.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { - this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + this(config.getApplicationName(), + recordProcessorFactory, config, new StreamConfig( new KinesisProxy(config, kinesisClient), @@ -1164,20 +1164,6 @@ public class Worker implements Runnable { @Setter @Accessors(fluent = true) private WorkerStateChangeListener workerStateChangeListener; - /** - * Provide a V1 {@link software.amazon.kinesis.processor.IRecordProcessor - * IRecordProcessor}. - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder recordProcessorFactory( - software.amazon.kinesis.processor.IRecordProcessorFactory recordProcessorFactory) { - this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory); - return this; - } - /** * Provide a V2 {@link IRecordProcessor * IRecordProcessor}. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializationInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializationInput.java index e6b95632..90d4a7f9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializationInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializationInput.java @@ -14,7 +14,7 @@ */ package software.amazon.kinesis.lifecycle; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index 31367f13..9673cc24 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -18,7 +18,7 @@ import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.coordinator.StreamConfig; import software.amazon.kinesis.processor.ICheckpoint; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.KinesisDataFetcher; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessRecordsInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessRecordsInput.java index b2bd2fb8..5bb47cd1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessRecordsInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessRecordsInput.java @@ -22,7 +22,7 @@ import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.model.Record; import lombok.Getter; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; /** * Container for the parameters to the IRecordProcessor's diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 5bd917e8..5076dc6f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -23,7 +23,7 @@ import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.coordinator.StreamConfig; import software.amazon.kinesis.retrieval.ThrottlingReporter; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.IKinesisProxyExtended; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 1352f2b6..d5e30b76 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -28,7 +28,7 @@ import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.coordinator.StreamConfig; import software.amazon.kinesis.processor.ICheckpoint; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.metrics.IMetricsFactory; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java index dd18d291..1dbfd504 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java @@ -16,7 +16,7 @@ package software.amazon.kinesis.lifecycle; import java.util.concurrent.CountDownLatch; -import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; +import software.amazon.kinesis.processor.IShutdownNotificationAware; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.LeaseCoordinator; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownInput.java index 41cefdbd..b74cc526 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownInput.java @@ -15,7 +15,7 @@ package software.amazon.kinesis.lifecycle; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; /** * Container for the parameters to the IRecordProcessor's diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotification.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotification.java index 4f2b9263..e562c068 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotification.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotification.java @@ -14,8 +14,7 @@ */ package software.amazon.kinesis.lifecycle; -import software.amazon.kinesis.lifecycle.ShutdownInput; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; /** * A shutdown request to the ShardConsumer diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java index 480fcbb4..dfffd9b0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java @@ -16,8 +16,8 @@ package software.amazon.kinesis.lifecycle; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; -import software.amazon.kinesis.processor.v2.IRecordProcessor; -import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; +import software.amazon.kinesis.processor.IRecordProcessor; +import software.amazon.kinesis.processor.IShutdownNotificationAware; /** * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownReason.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownReason.java index 0381ebab..3db4151f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownReason.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownReason.java @@ -14,7 +14,7 @@ */ package software.amazon.kinesis.lifecycle; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import static software.amazon.kinesis.lifecycle.ConsumerStates.ConsumerState; import static software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 04026de2..0e553c39 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -18,7 +18,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardSyncer; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessor.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessor.java index a74941f1..abd38478 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessor.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessor.java @@ -14,9 +14,9 @@ */ package software.amazon.kinesis.processor; -import java.util.List; - -import com.amazonaws.services.kinesis.model.Record; +import software.amazon.kinesis.lifecycle.InitializationInput; +import software.amazon.kinesis.lifecycle.ProcessRecordsInput; +import software.amazon.kinesis.lifecycle.ShutdownInput; import software.amazon.kinesis.lifecycle.ShutdownReason; /** @@ -28,35 +28,35 @@ public interface IRecordProcessor { /** * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance * (via processRecords). - * - * @param shardId The record processor will be responsible for processing records of this shard. + * + * @param initializationInput Provides information related to initialization */ - void initialize(String shardId); + void initialize(InitializationInput initializationInput); /** * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the * application. * Upon fail over, the new instance will get records with sequence number > checkpoint position * for each partition key. - * - * @param records Data records to be processed - * @param checkpointer RecordProcessor should use this instance to checkpoint their progress. + * + * @param processRecordsInput Provides the records to be processed as well as information and capabilities related + * to them (eg checkpointing). */ - void processRecords(List records, IRecordProcessorCheckpointer checkpointer); + void processRecords(ProcessRecordsInput processRecordsInput); /** * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this - * RecordProcessor instance. The reason parameter indicates: - * a/ ShutdownReason.TERMINATE - The shard has been closed and there will not be any more records to process. The - * record processor should checkpoint (after doing any housekeeping) to acknowledge that it has successfully - * completed processing all records in this shard. - * b/ ShutdownReason.ZOMBIE: A fail over has occurred and a different record processor is (or will be) responsible - * for processing records. - * - * @param checkpointer RecordProcessor should use this instance to checkpoint. - * @param reason Reason for the shutdown (ShutdownReason.TERMINATE indicates the shard is closed and there are no - * more records to process. Shutdown.ZOMBIE indicates a fail over has occurred). + * RecordProcessor instance. + * + *

Warning

+ * + * When the value of {@link ShutdownInput#getShutdownReason()} is + * {@link ShutdownReason#TERMINATE} it is required that you + * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. + * + * @param shutdownInput + * Provides information and capabilities (eg checkpointing) related to shutdown of this record processor. */ - void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason); + void shutdown(ShutdownInput shutdownInput); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessorFactory.java index 8186fa13..cacf0b5b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IRecordProcessorFactory.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.processor; + /** * The Amazon Kinesis Client Library will use this to instantiate a record processor per shard. * Clients may choose to create separate instantiations, or re-use instantiations. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IShutdownNotificationAware.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IShutdownNotificationAware.java similarity index 96% rename from amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IShutdownNotificationAware.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IShutdownNotificationAware.java index 82200235..a3e780ae 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IShutdownNotificationAware.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/IShutdownNotificationAware.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package software.amazon.kinesis.processor.v2; +package software.amazon.kinesis.processor; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorFactory.java index 7ded5e36..0408a2b8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorFactory.java @@ -15,8 +15,6 @@ package software.amazon.kinesis.processor; -import software.amazon.kinesis.processor.v2.IRecordProcessor; - /** * */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorAdapter.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorAdapter.java deleted file mode 100644 index 7ebbd9d1..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorAdapter.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.processor; - -import software.amazon.kinesis.processor.v2.IRecordProcessor; -import software.amazon.kinesis.lifecycle.InitializationInput; -import software.amazon.kinesis.lifecycle.ProcessRecordsInput; -import software.amazon.kinesis.lifecycle.ShutdownInput; - -/** - * Adapts a V1 {@link software.amazon.kinesis.processor.IRecordProcessor IRecordProcessor} - * to V2 {@link IRecordProcessor IRecordProcessor}. - */ -class V1ToV2RecordProcessorAdapter implements IRecordProcessor { - - private software.amazon.kinesis.processor.IRecordProcessor recordProcessor; - - V1ToV2RecordProcessorAdapter( - software.amazon.kinesis.processor.IRecordProcessor recordProcessor) { - this.recordProcessor = recordProcessor; - } - - @Override - public void initialize(InitializationInput initializationInput) { - recordProcessor.initialize(initializationInput.getShardId()); - } - - @Override - public void processRecords(ProcessRecordsInput processRecordsInput) { - recordProcessor.processRecords(processRecordsInput.getRecords(), processRecordsInput.getCheckpointer()); - - } - - @Override - public void shutdown(ShutdownInput shutdownInput) { - recordProcessor.shutdown(shutdownInput.getCheckpointer(), shutdownInput.getShutdownReason()); - } - -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorFactoryAdapter.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorFactoryAdapter.java deleted file mode 100644 index 99a4df85..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorFactoryAdapter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.processor; - -import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; -import software.amazon.kinesis.processor.v2.IRecordProcessor; - -/** - * Adapts a V1 {@link software.amazon.kinesis.processor.IRecordProcessorFactory - * IRecordProcessorFactory} to V2 - * {@link IRecordProcessorFactory IRecordProcessorFactory}. - */ -public class V1ToV2RecordProcessorFactoryAdapter implements IRecordProcessorFactory { - - private software.amazon.kinesis.processor.IRecordProcessorFactory factory; - - public V1ToV2RecordProcessorFactoryAdapter( - software.amazon.kinesis.processor.IRecordProcessorFactory factory) { - this.factory = factory; - } - - @Override - public IRecordProcessor createProcessor() { - return new V1ToV2RecordProcessorAdapter(factory.createProcessor()); - } -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessor.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessor.java deleted file mode 100644 index ecb1ad36..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessor.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.processor.v2; - -import software.amazon.kinesis.lifecycle.InitializationInput; -import software.amazon.kinesis.lifecycle.ProcessRecordsInput; -import software.amazon.kinesis.lifecycle.ShutdownInput; -import software.amazon.kinesis.lifecycle.ShutdownReason; - -/** - * The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon - * Kinesis. - */ -public interface IRecordProcessor { - - /** - * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance - * (via processRecords). - * - * @param initializationInput Provides information related to initialization - */ - void initialize(InitializationInput initializationInput); - - /** - * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the - * application. - * Upon fail over, the new instance will get records with sequence number > checkpoint position - * for each partition key. - * - * @param processRecordsInput Provides the records to be processed as well as information and capabilities related - * to them (eg checkpointing). - */ - void processRecords(ProcessRecordsInput processRecordsInput); - - /** - * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this - * RecordProcessor instance. - * - *

Warning

- * - * When the value of {@link ShutdownInput#getShutdownReason()} is - * {@link ShutdownReason#TERMINATE} it is required that you - * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. - * - * @param shutdownInput - * Provides information and capabilities (eg checkpointing) related to shutdown of this record processor. - */ - void shutdown(ShutdownInput shutdownInput); - -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessorFactory.java deleted file mode 100644 index fd4ca7b2..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/v2/IRecordProcessorFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.processor.v2; - - -/** - * The Amazon Kinesis Client Library will use this to instantiate a record processor per shard. - * Clients may choose to create separate instantiations, or re-use instantiations. - */ -public interface IRecordProcessorFactory { - - /** - * Returns a record processor to be used for processing data records for a (assigned) shard. - * - * @return Returns a processor object. - */ - IRecordProcessor createProcessor(); - -} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java index 25cb3ad2..ec7c13aa 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java @@ -64,8 +64,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import org.hamcrest.Condition; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -84,51 +82,10 @@ import org.mockito.stubbing.Answer; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; -import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; -import software.amazon.kinesis.leases.NoOpShardPrioritization; -import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.leases.ShardObjectHelper; -import software.amazon.kinesis.leases.ShardPrioritization; -import software.amazon.kinesis.leases.ShardSequenceVerifier; -import software.amazon.kinesis.leases.ShardSyncer; -import software.amazon.kinesis.lifecycle.BlockOnParentShardTask; -import software.amazon.kinesis.lifecycle.ITask; -import software.amazon.kinesis.lifecycle.InitializeTask; -import software.amazon.kinesis.lifecycle.ShardConsumer; -import software.amazon.kinesis.lifecycle.ShutdownNotificationTask; -import software.amazon.kinesis.lifecycle.ShutdownReason; -import software.amazon.kinesis.lifecycle.ShutdownTask; -import software.amazon.kinesis.lifecycle.TaskResult; -import software.amazon.kinesis.lifecycle.TaskType; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; -import software.amazon.kinesis.processor.ICheckpoint; -import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; -import software.amazon.kinesis.processor.V1ToV2RecordProcessorFactoryAdapter; -import software.amazon.kinesis.processor.v2.IRecordProcessor; -import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; -import software.amazon.kinesis.coordinator.Worker.WorkerCWMetricsFactory; -import software.amazon.kinesis.coordinator.Worker.WorkerThreadPoolExecutor; -import software.amazon.kinesis.coordinator.WorkerStateChangeListener.WorkerState; -import software.amazon.kinesis.retrieval.GetRecordsCache; -import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; -import software.amazon.kinesis.retrieval.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; -import software.amazon.kinesis.retrieval.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; -import software.amazon.kinesis.retrieval.RecordsFetcherFactory; -import software.amazon.kinesis.retrieval.SimpleRecordsFetcherFactory; -import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.kinesis.lifecycle.InitializationInput; -import software.amazon.kinesis.lifecycle.ProcessRecordsInput; -import software.amazon.kinesis.lifecycle.ShutdownInput; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.KinesisClientLeaseBuilder; -import software.amazon.kinesis.leases.KinesisClientLeaseManager; -import software.amazon.kinesis.leases.LeaseManager; -import software.amazon.kinesis.leases.ILeaseManager; -import software.amazon.kinesis.metrics.CWMetricsFactory; -import software.amazon.kinesis.metrics.NullMetricsFactory; -import software.amazon.kinesis.metrics.IMetricsFactory; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.SequenceNumberRange; @@ -138,6 +95,48 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.coordinator.Worker.WorkerCWMetricsFactory; +import software.amazon.kinesis.coordinator.Worker.WorkerThreadPoolExecutor; +import software.amazon.kinesis.coordinator.WorkerStateChangeListener.WorkerState; +import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.KinesisClientLease; +import software.amazon.kinesis.leases.KinesisClientLeaseBuilder; +import software.amazon.kinesis.leases.KinesisClientLeaseManager; +import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; +import software.amazon.kinesis.leases.LeaseManager; +import software.amazon.kinesis.leases.NoOpShardPrioritization; +import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.ShardObjectHelper; +import software.amazon.kinesis.leases.ShardPrioritization; +import software.amazon.kinesis.leases.ShardSequenceVerifier; +import software.amazon.kinesis.leases.ShardSyncer; +import software.amazon.kinesis.lifecycle.BlockOnParentShardTask; +import software.amazon.kinesis.lifecycle.ITask; +import software.amazon.kinesis.lifecycle.InitializationInput; +import software.amazon.kinesis.lifecycle.InitializeTask; +import software.amazon.kinesis.lifecycle.ProcessRecordsInput; +import software.amazon.kinesis.lifecycle.ShardConsumer; +import software.amazon.kinesis.lifecycle.ShutdownInput; +import software.amazon.kinesis.lifecycle.ShutdownNotificationTask; +import software.amazon.kinesis.lifecycle.ShutdownReason; +import software.amazon.kinesis.lifecycle.ShutdownTask; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.lifecycle.TaskType; +import software.amazon.kinesis.metrics.CWMetricsFactory; +import software.amazon.kinesis.metrics.IMetricsFactory; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; +import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.processor.ICheckpoint; +import software.amazon.kinesis.processor.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; +import software.amazon.kinesis.processor.IRecordProcessorFactory; +import software.amazon.kinesis.retrieval.GetRecordsCache; +import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.IKinesisProxy; +import software.amazon.kinesis.retrieval.KinesisProxy; +import software.amazon.kinesis.retrieval.RecordsFetcherFactory; +import software.amazon.kinesis.retrieval.SimpleRecordsFetcherFactory; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.utils.TestStreamlet; import software.amazon.kinesis.utils.TestStreamletFactory; @@ -211,37 +210,38 @@ public class WorkerTest { @Override public software.amazon.kinesis.processor.IRecordProcessor createProcessor() { - return new software.amazon.kinesis.processor.IRecordProcessor() { + return new IRecordProcessor() { @Override - public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { - if (reason == ShutdownReason.TERMINATE) { - try { - checkpointer.checkpoint(); - } catch (KinesisClientLibNonRetryableException e) { - throw new RuntimeException(e); - } - } + public void initialize(final InitializationInput initializationInput) { + } @Override - public void processRecords(List dataRecords, IRecordProcessorCheckpointer checkpointer) { + public void processRecords(final ProcessRecordsInput processRecordsInput) { try { - checkpointer.checkpoint(); + processRecordsInput.getCheckpointer().checkpoint(); } catch (KinesisClientLibNonRetryableException e) { throw new RuntimeException(e); } } @Override - public void initialize(String shardId) { + public void shutdown(final ShutdownInput shutdownInput) { + if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { + try { + shutdownInput.getCheckpointer().checkpoint(); + } catch (KinesisClientLibNonRetryableException e) { + throw new RuntimeException(e); + } + } } + }; } }; - private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = - new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY); + private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = SAMPLE_RECORD_PROCESSOR_FACTORY; /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index f0f8e247..8807d366 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -45,7 +45,7 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.leases.KinesisClientLease; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index 98cbee6a..4a97d347 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -50,7 +50,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 39c711ec..89bcdb9e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -67,7 +67,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.kinesis.processor.ICheckpoint; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.checkpoint.Checkpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl; import software.amazon.kinesis.retrieval.AsynchronousGetRecordsRetrievalStrategy; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 5503c50d..b5ea7ece 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -35,7 +35,7 @@ import org.junit.BeforeClass; import org.junit.Test; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; -import software.amazon.kinesis.processor.v2.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamlet.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamlet.java index 36e2d0da..a6e1ba97 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamlet.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamlet.java @@ -29,8 +29,8 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingExcepti import software.amazon.kinesis.leases.ShardSequenceVerifier; import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; -import software.amazon.kinesis.processor.v2.IRecordProcessor; -import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; +import software.amazon.kinesis.processor.IRecordProcessor; +import software.amazon.kinesis.processor.IShutdownNotificationAware; import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ShutdownInput; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamletFactory.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamletFactory.java index e81a853a..c65fa85b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamletFactory.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestStreamletFactory.java @@ -19,8 +19,8 @@ import java.util.List; import java.util.concurrent.Semaphore; import software.amazon.kinesis.leases.ShardSequenceVerifier; -import software.amazon.kinesis.processor.v2.IRecordProcessor; -import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; +import software.amazon.kinesis.processor.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessorFactory; /** * Factory for TestStreamlet record processors.