Removing RecordProcessor V1, and replacing it wth v2

This commit is contained in:
Sahil Palvia 2018-03-23 12:59:35 -07:00
parent dffe9194f2
commit 07a03b0d0e
33 changed files with 137 additions and 336 deletions

View file

@ -23,7 +23,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import software.amazon.kinesis.processor.IRecordProcessorFactory;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.coordinator.Worker;

View file

@ -21,8 +21,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IShutdownNotificationAware;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.processor.IShutdownNotificationAware;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;

View file

@ -16,8 +16,8 @@ package com.amazonaws.services.kinesis.multilang;
import java.util.concurrent.ExecutorService;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessorFactory;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import com.fasterxml.jackson.databind.ObjectMapper;

View file

@ -18,7 +18,7 @@ import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import org.junit.Assert;
import org.junit.Test;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

View file

@ -36,7 +36,7 @@ import software.amazon.kinesis.metrics.MetricsLevel;
import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.DataFetchingStrategy;
import software.amazon.kinesis.retrieval.KinesisProxy;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;

View file

@ -50,7 +50,6 @@ import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ProcessorFactory;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.RetrievalConfig;

View file

@ -41,39 +41,38 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.ParentsFirstShardPrioritization;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.V1ToV2RecordProcessorFactoryAdapter;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import software.amazon.kinesis.processor.v2.IShutdownNotificationAware;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.KinesisProxy;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLeaseManager;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.metrics.CWMetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.metrics.IMetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLeaseManager;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.ParentsFirstShardPrioritization;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CWMetricsFactory;
import software.amazon.kinesis.metrics.IMetricsFactory;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessorFactory;
import software.amazon.kinesis.processor.IShutdownNotificationAware;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.KinesisProxy;
/**
* Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
@ -376,7 +375,8 @@ public class Worker implements Runnable {
software.amazon.kinesis.processor.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
IMetricsFactory metricsFactory, ExecutorService execService) {
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
this(config.getApplicationName(),
recordProcessorFactory,
config,
new StreamConfig(
new KinesisProxy(config, kinesisClient),
@ -1164,20 +1164,6 @@ public class Worker implements Runnable {
@Setter @Accessors(fluent = true)
private WorkerStateChangeListener workerStateChangeListener;
/**
* Provide a V1 {@link software.amazon.kinesis.processor.IRecordProcessor
* IRecordProcessor}.
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder recordProcessorFactory(
software.amazon.kinesis.processor.IRecordProcessorFactory recordProcessorFactory) {
this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory);
return this;
}
/**
* Provide a V2 {@link IRecordProcessor
* IRecordProcessor}.

View file

@ -14,7 +14,7 @@
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**

View file

@ -18,7 +18,7 @@ import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.KinesisDataFetcher;

View file

@ -22,7 +22,7 @@ import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.model.Record;
import lombok.Getter;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
/**
* Container for the parameters to the IRecordProcessor's

View file

@ -23,7 +23,7 @@ import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.IKinesisProxyExtended;

View file

@ -28,7 +28,7 @@ import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.metrics.IMetricsFactory;

View file

@ -16,7 +16,7 @@ package software.amazon.kinesis.lifecycle;
import java.util.concurrent.CountDownLatch;
import software.amazon.kinesis.processor.v2.IShutdownNotificationAware;
import software.amazon.kinesis.processor.IShutdownNotificationAware;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.LeaseCoordinator;

View file

@ -15,7 +15,7 @@
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
/**
* Container for the parameters to the IRecordProcessor's

View file

@ -14,8 +14,7 @@
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
/**
* A shutdown request to the ShardConsumer

View file

@ -16,8 +16,8 @@ package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IShutdownNotificationAware;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.processor.IShutdownNotificationAware;
/**
* Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint.

View file

@ -14,7 +14,7 @@
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import static software.amazon.kinesis.lifecycle.ConsumerStates.ConsumerState;
import static software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState;

View file

@ -18,7 +18,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

View file

@ -14,9 +14,9 @@
*/
package software.amazon.kinesis.processor;
import java.util.List;
import com.amazonaws.services.kinesis.model.Record;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
/**
@ -28,35 +28,35 @@ public interface IRecordProcessor {
/**
* Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
* (via processRecords).
*
* @param shardId The record processor will be responsible for processing records of this shard.
*
* @param initializationInput Provides information related to initialization
*/
void initialize(String shardId);
void initialize(InitializationInput initializationInput);
/**
* Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
* application.
* Upon fail over, the new instance will get records with sequence number > checkpoint position
* for each partition key.
*
* @param records Data records to be processed
* @param checkpointer RecordProcessor should use this instance to checkpoint their progress.
*
* @param processRecordsInput Provides the records to be processed as well as information and capabilities related
* to them (eg checkpointing).
*/
void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer);
void processRecords(ProcessRecordsInput processRecordsInput);
/**
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
* RecordProcessor instance. The reason parameter indicates:
* a/ ShutdownReason.TERMINATE - The shard has been closed and there will not be any more records to process. The
* record processor should checkpoint (after doing any housekeeping) to acknowledge that it has successfully
* completed processing all records in this shard.
* b/ ShutdownReason.ZOMBIE: A fail over has occurred and a different record processor is (or will be) responsible
* for processing records.
*
* @param checkpointer RecordProcessor should use this instance to checkpoint.
* @param reason Reason for the shutdown (ShutdownReason.TERMINATE indicates the shard is closed and there are no
* more records to process. Shutdown.ZOMBIE indicates a fail over has occurred).
* RecordProcessor instance.
*
* <h2><b>Warning</b></h2>
*
* When the value of {@link ShutdownInput#getShutdownReason()} is
* {@link ShutdownReason#TERMINATE} it is required that you
* checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
*
* @param shutdownInput
* Provides information and capabilities (eg checkpointing) related to shutdown of this record processor.
*/
void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason);
void shutdown(ShutdownInput shutdownInput);
}

View file

@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.processor;
/**
* The Amazon Kinesis Client Library will use this to instantiate a record processor per shard.
* Clients may choose to create separate instantiations, or re-use instantiations.

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.processor.v2;
package software.amazon.kinesis.processor;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;

View file

@ -15,8 +15,6 @@
package software.amazon.kinesis.processor;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
/**
*
*/

View file

@ -1,51 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.processor;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownInput;
/**
* Adapts a V1 {@link software.amazon.kinesis.processor.IRecordProcessor IRecordProcessor}
* to V2 {@link IRecordProcessor IRecordProcessor}.
*/
class V1ToV2RecordProcessorAdapter implements IRecordProcessor {
private software.amazon.kinesis.processor.IRecordProcessor recordProcessor;
V1ToV2RecordProcessorAdapter(
software.amazon.kinesis.processor.IRecordProcessor recordProcessor) {
this.recordProcessor = recordProcessor;
}
@Override
public void initialize(InitializationInput initializationInput) {
recordProcessor.initialize(initializationInput.getShardId());
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
recordProcessor.processRecords(processRecordsInput.getRecords(), processRecordsInput.getCheckpointer());
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
recordProcessor.shutdown(shutdownInput.getCheckpointer(), shutdownInput.getShutdownReason());
}
}

View file

@ -1,38 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.processor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
/**
* Adapts a V1 {@link software.amazon.kinesis.processor.IRecordProcessorFactory
* IRecordProcessorFactory} to V2
* {@link IRecordProcessorFactory IRecordProcessorFactory}.
*/
public class V1ToV2RecordProcessorFactoryAdapter implements IRecordProcessorFactory {
private software.amazon.kinesis.processor.IRecordProcessorFactory factory;
public V1ToV2RecordProcessorFactoryAdapter(
software.amazon.kinesis.processor.IRecordProcessorFactory factory) {
this.factory = factory;
}
@Override
public IRecordProcessor createProcessor() {
return new V1ToV2RecordProcessorAdapter(factory.createProcessor());
}
}

View file

@ -1,62 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.processor.v2;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
/**
* The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon
* Kinesis.
*/
public interface IRecordProcessor {
/**
* Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
* (via processRecords).
*
* @param initializationInput Provides information related to initialization
*/
void initialize(InitializationInput initializationInput);
/**
* Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
* application.
* Upon fail over, the new instance will get records with sequence number > checkpoint position
* for each partition key.
*
* @param processRecordsInput Provides the records to be processed as well as information and capabilities related
* to them (eg checkpointing).
*/
void processRecords(ProcessRecordsInput processRecordsInput);
/**
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
* RecordProcessor instance.
*
* <h2><b>Warning</b></h2>
*
* When the value of {@link ShutdownInput#getShutdownReason()} is
* {@link ShutdownReason#TERMINATE} it is required that you
* checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
*
* @param shutdownInput
* Provides information and capabilities (eg checkpointing) related to shutdown of this record processor.
*/
void shutdown(ShutdownInput shutdownInput);
}

View file

@ -1,31 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.processor.v2;
/**
* The Amazon Kinesis Client Library will use this to instantiate a record processor per shard.
* Clients may choose to create separate instantiations, or re-use instantiations.
*/
public interface IRecordProcessorFactory {
/**
* Returns a record processor to be used for processing data records for a (assigned) shard.
*
* @return Returns a processor object.
*/
IRecordProcessor createProcessor();
}

View file

@ -64,8 +64,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import org.hamcrest.Condition;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
@ -84,51 +82,10 @@ import org.mockito.stubbing.Answer;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.NoOpShardPrioritization;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSequenceVerifier;
import software.amazon.kinesis.leases.ShardSyncer;
import software.amazon.kinesis.lifecycle.BlockOnParentShardTask;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.InitializeTask;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownNotificationTask;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.ShutdownTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.V1ToV2RecordProcessorFactoryAdapter;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import software.amazon.kinesis.coordinator.Worker.WorkerCWMetricsFactory;
import software.amazon.kinesis.coordinator.Worker.WorkerThreadPoolExecutor;
import software.amazon.kinesis.coordinator.WorkerStateChangeListener.WorkerState;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
import software.amazon.kinesis.retrieval.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.SimpleRecordsFetcherFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLeaseBuilder;
import software.amazon.kinesis.leases.KinesisClientLeaseManager;
import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.metrics.CWMetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.metrics.IMetricsFactory;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@ -138,6 +95,48 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.coordinator.Worker.WorkerCWMetricsFactory;
import software.amazon.kinesis.coordinator.Worker.WorkerThreadPoolExecutor;
import software.amazon.kinesis.coordinator.WorkerStateChangeListener.WorkerState;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLeaseBuilder;
import software.amazon.kinesis.leases.KinesisClientLeaseManager;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.NoOpShardPrioritization;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSequenceVerifier;
import software.amazon.kinesis.leases.ShardSyncer;
import software.amazon.kinesis.lifecycle.BlockOnParentShardTask;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.InitializeTask;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.lifecycle.ShutdownNotificationTask;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.ShutdownTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.metrics.CWMetricsFactory;
import software.amazon.kinesis.metrics.IMetricsFactory;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.IRecordProcessorFactory;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.KinesisProxy;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.SimpleRecordsFetcherFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.utils.TestStreamlet;
import software.amazon.kinesis.utils.TestStreamletFactory;
@ -211,37 +210,38 @@ public class WorkerTest {
@Override
public software.amazon.kinesis.processor.IRecordProcessor createProcessor() {
return new software.amazon.kinesis.processor.IRecordProcessor() {
return new IRecordProcessor() {
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibNonRetryableException e) {
throw new RuntimeException(e);
}
}
public void initialize(final InitializationInput initializationInput) {
}
@Override
public void processRecords(List<Record> dataRecords, IRecordProcessorCheckpointer checkpointer) {
public void processRecords(final ProcessRecordsInput processRecordsInput) {
try {
checkpointer.checkpoint();
processRecordsInput.getCheckpointer().checkpoint();
} catch (KinesisClientLibNonRetryableException e) {
throw new RuntimeException(e);
}
}
@Override
public void initialize(String shardId) {
public void shutdown(final ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (KinesisClientLibNonRetryableException e) {
throw new RuntimeException(e);
}
}
}
};
}
};
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = SAMPLE_RECORD_PROCESSOR_FACTORY;
/**

View file

@ -45,7 +45,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.leases.KinesisClientLease;

View file

@ -50,7 +50,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

View file

@ -67,7 +67,7 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.checkpoint.Checkpoint;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl;
import software.amazon.kinesis.retrieval.AsynchronousGetRecordsRetrievalStrategy;

View file

@ -35,7 +35,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

View file

@ -29,8 +29,8 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingExcepti
import software.amazon.kinesis.leases.ShardSequenceVerifier;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IShutdownNotificationAware;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.processor.IShutdownNotificationAware;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownInput;

View file

@ -19,8 +19,8 @@ import java.util.List;
import java.util.concurrent.Semaphore;
import software.amazon.kinesis.leases.ShardSequenceVerifier;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessorFactory;
/**
* Factory for TestStreamlet record processors.