Broke out the lifecycle-ish classes

This commit is contained in:
Pfifer, Justin 2018-03-14 10:58:07 -07:00
parent d9143ce5f5
commit dbbcae9db6
43 changed files with 274 additions and 174 deletions

View file

@ -22,7 +22,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;

View file

@ -23,7 +23,7 @@ import java.util.concurrent.TimeoutException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;

View file

@ -14,7 +14,7 @@
*/ */
package com.amazonaws.services.kinesis.multilang.messages; package com.amazonaws.services.kinesis.multilang.messages;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
/** /**
* A message to indicate to the client's process that it should shutdown and then terminate. * A message to indicate to the client's process that it should shutdown and then terminate.

View file

@ -30,7 +30,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;

View file

@ -20,7 +20,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;

View file

@ -21,7 +21,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingExcepti
import software.amazon.kinesis.processor.IPreparedCheckpointer; import software.amazon.kinesis.processor.IPreparedCheckpointer;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownInput; import software.amazon.kinesis.lifecycle.ShutdownInput;

View file

@ -22,7 +22,7 @@ import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;

View file

@ -1,38 +0,0 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.Callable;
/**
* Interface for shard processing tasks.
* A task may execute an application callback (e.g. initialize, process, shutdown).
*/
interface ITask extends Callable<TaskResult> {
/**
* Perform task logic.
* E.g. perform set up (e.g. fetch records) and invoke a callback (e.g. processRecords() API).
*
* @return TaskResult (captures any exceptions encountered during execution of the task)
*/
TaskResult call();
/**
* @return TaskType
*/
TaskType getTaskType();
}

View file

@ -24,6 +24,8 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.RegionUtils;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ProcessTask;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.IMetricsScope; import software.amazon.kinesis.metrics.IMetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsLevel;
@ -998,7 +1000,7 @@ public class KinesisClientLibConfiguration {
* Controls how long the KCL will sleep if no records are returned from Kinesis * Controls how long the KCL will sleep if no records are returned from Kinesis
* *
* <p> * <p>
* This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will * This value is only used when no records are returned; if records are returned, the {@link ProcessTask} will
* immediately retrieve the next set of records after the call to * immediately retrieve the next set of records after the call to
* {@link IRecordProcessor#processRecords(ProcessRecordsInput)} * {@link IRecordProcessor#processRecords(ProcessRecordsInput)}
* has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this * has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this

View file

@ -14,6 +14,9 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.IMetricsFactory; import software.amazon.kinesis.metrics.IMetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsLevel;
@ -21,7 +24,7 @@ import software.amazon.kinesis.metrics.MetricsLevel;
/** /**
* Decorates an ITask and reports metrics about its timing and success/failure. * Decorates an ITask and reports metrics about its timing and success/failure.
*/ */
class MetricsCollectingTaskDecorator implements ITask { public class MetricsCollectingTaskDecorator implements ITask {
private final ITask other; private final ITask other;
private final IMetricsFactory factory; private final IMetricsFactory factory;

View file

@ -37,7 +37,7 @@ import lombok.extern.slf4j.Slf4j;
* RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment. * RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment.
*/ */
@Slf4j @Slf4j
class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
private ICheckpoint checkpoint; private ICheckpoint checkpoint;
private ExtendedSequenceNumber largestPermittedCheckpointValue; private ExtendedSequenceNumber largestPermittedCheckpointValue;
@ -59,7 +59,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
* @param checkpoint Used to checkpoint progress of a RecordProcessor * @param checkpoint Used to checkpoint progress of a RecordProcessor
* @param validator Used for validating sequence numbers * @param validator Used for validating sequence numbers
*/ */
RecordProcessorCheckpointer(ShardInfo shardInfo, public RecordProcessorCheckpointer(ShardInfo shardInfo,
ICheckpoint checkpoint, ICheckpoint checkpoint,
SequenceNumberValidator validator, SequenceNumberValidator validator,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
@ -227,11 +227,11 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
/** /**
* @return the lastCheckpointValue * @return the lastCheckpointValue
*/ */
ExtendedSequenceNumber getLastCheckpointValue() { public ExtendedSequenceNumber getLastCheckpointValue() {
return lastCheckpointValue; return lastCheckpointValue;
} }
synchronized void setInitialCheckpointValue(ExtendedSequenceNumber initialCheckpoint) { public synchronized void setInitialCheckpointValue(ExtendedSequenceNumber initialCheckpoint) {
lastCheckpointValue = initialCheckpoint; lastCheckpointValue = initialCheckpoint;
} }
@ -240,14 +240,14 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
* *
* @return the largest permitted checkpoint * @return the largest permitted checkpoint
*/ */
synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() { public synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() {
return largestPermittedCheckpointValue; return largestPermittedCheckpointValue;
} }
/** /**
* @param largestPermittedCheckpointValue the largest permitted checkpoint * @param largestPermittedCheckpointValue the largest permitted checkpoint
*/ */
synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { public synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) {
this.largestPermittedCheckpointValue = largestPermittedCheckpointValue; this.largestPermittedCheckpointValue = largestPermittedCheckpointValue;
} }
@ -258,7 +258,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
* *
* @param extendedSequenceNumber * @param extendedSequenceNumber
*/ */
synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) { public synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) {
this.sequenceNumberAtShardEnd = extendedSequenceNumber; this.sequenceNumberAtShardEnd = extendedSequenceNumber;
} }

View file

@ -48,7 +48,7 @@ public class SequenceNumberValidator {
* @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers * @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers
* being validated * being validated
*/ */
SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) {
this.proxy = proxy; this.proxy = proxy;
this.shardId = shardId; this.shardId = shardId;
this.validateWithGetIterator = validateWithGetIterator; this.validateWithGetIterator = validateWithGetIterator;

View file

@ -86,7 +86,7 @@ public class ShardInfo {
* *
* @return a list of shardId's that are parents of this shard, or empty if the shard has no parents. * @return a list of shardId's that are parents of this shard, or empty if the shard has no parents.
*/ */
protected List<String> getParentShardIds() { public List<String> getParentShardIds() {
return new LinkedList<String>(parentShardIds); return new LinkedList<String>(parentShardIds);
} }

View file

@ -14,6 +14,9 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.ILeaseManager;

View file

@ -19,6 +19,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.ILeaseManager;

View file

@ -49,7 +49,7 @@ import lombok.extern.slf4j.Slf4j;
* and begun processing it's child shards. * and begun processing it's child shards.
*/ */
@Slf4j @Slf4j
class ShardSyncer { public class ShardSyncer {
/** /**
* Note constructor is private: We use static synchronized methods - this is a utility class. * Note constructor is private: We use static synchronized methods - this is a utility class.
@ -80,7 +80,7 @@ class ShardSyncer {
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException * @throws KinesisClientLibIOException
*/ */
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, public static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,

View file

@ -19,7 +19,7 @@ import software.amazon.kinesis.retrieval.IKinesisProxy;
/** /**
* Used to capture stream configuration and pass it along. * Used to capture stream configuration and pass it along.
*/ */
class StreamConfig { public class StreamConfig {
private final IKinesisProxy streamProxy; private final IKinesisProxy streamProxy;
private final int maxRecords; private final int maxRecords;
@ -37,7 +37,7 @@ class StreamConfig {
* @param validateSequenceNumberBeforeCheckpointing Whether to call Amazon Kinesis to validate sequence numbers * @param validateSequenceNumberBeforeCheckpointing Whether to call Amazon Kinesis to validate sequence numbers
* @param initialPositionInStream Initial position in stream * @param initialPositionInStream Initial position in stream
*/ */
StreamConfig(IKinesisProxy proxy, public StreamConfig(IKinesisProxy proxy,
int maxRecords, int maxRecords,
long idleTimeInMilliseconds, long idleTimeInMilliseconds,
boolean callProcessRecordsEvenForEmptyRecordList, boolean callProcessRecordsEvenForEmptyRecordList,
@ -54,42 +54,42 @@ class StreamConfig {
/** /**
* @return the streamProxy * @return the streamProxy
*/ */
IKinesisProxy getStreamProxy() { public IKinesisProxy getStreamProxy() {
return streamProxy; return streamProxy;
} }
/** /**
* @return the maxRecords * @return the maxRecords
*/ */
int getMaxRecords() { public int getMaxRecords() {
return maxRecords; return maxRecords;
} }
/** /**
* @return the idleTimeInMilliseconds * @return the idleTimeInMilliseconds
*/ */
long getIdleTimeInMilliseconds() { public long getIdleTimeInMilliseconds() {
return idleTimeInMilliseconds; return idleTimeInMilliseconds;
} }
/** /**
* @return the callProcessRecordsEvenForEmptyRecordList * @return the callProcessRecordsEvenForEmptyRecordList
*/ */
boolean shouldCallProcessRecordsEvenForEmptyRecordList() { public boolean shouldCallProcessRecordsEvenForEmptyRecordList() {
return callProcessRecordsEvenForEmptyRecordList; return callProcessRecordsEvenForEmptyRecordList;
} }
/** /**
* @return the initialPositionInStream * @return the initialPositionInStream
*/ */
InitialPositionInStreamExtended getInitialPositionInStream() { public InitialPositionInStreamExtended getInitialPositionInStream() {
return initialPositionInStream; return initialPositionInStream;
} }
/** /**
* @return validateSequenceNumberBeforeCheckpointing * @return validateSequenceNumberBeforeCheckpointing
*/ */
boolean shouldValidateSequenceNumberBeforeCheckpointing() { public boolean shouldValidateSequenceNumberBeforeCheckpointing() {
return validateSequenceNumberBeforeCheckpointing; return validateSequenceNumberBeforeCheckpointing;
} }
} }

View file

@ -21,14 +21,14 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
class ThrottlingReporter { public class ThrottlingReporter {
private final int maxConsecutiveWarnThrottles; private final int maxConsecutiveWarnThrottles;
private final String shardId; private final String shardId;
private int consecutiveThrottles = 0; private int consecutiveThrottles = 0;
void throttled() { public void throttled() {
consecutiveThrottles++; consecutiveThrottles++;
String message = "Shard '" + shardId + "' has been throttled " String message = "Shard '" + shardId + "' has been throttled "
+ consecutiveThrottles + " consecutively"; + consecutiveThrottles + " consecutively";
@ -41,7 +41,7 @@ class ThrottlingReporter {
} }
void success() { public void success() {
consecutiveThrottles = 0; consecutiveThrottles = 0;
} }

View file

@ -40,6 +40,9 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.AmazonKinesisClient;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -12,9 +12,10 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.ILeaseManager;
@ -30,7 +31,7 @@ import lombok.extern.slf4j.Slf4j;
* proceed with processing data from the shard. * proceed with processing data from the shard.
*/ */
@Slf4j @Slf4j
class BlockOnParentShardTask implements ITask { public class BlockOnParentShardTask implements ITask {
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final ILeaseManager<KinesisClientLease> leaseManager; private final ILeaseManager<KinesisClientLease> leaseManager;

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
/** /**
* Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,

View file

@ -0,0 +1,41 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import java.util.concurrent.Callable;
/**
* Interface for shard processing tasks.
* A task may execute an application callback (e.g. initialize, process, shutdown).
*/
public interface ITask extends Callable<TaskResult> {
/**
* Perform task logic.
* E.g. perform set up (e.g. fetch records) and invoke a callback (e.g. processRecords() API).
*
* @return TaskResult (captures any exceptions encountered during execution of the task)
*/
TaskResult call();
/**
* @return TaskType
*/
TaskType getTaskType();
}

View file

@ -12,15 +12,17 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsLevel;
@ -30,7 +32,7 @@ import lombok.extern.slf4j.Slf4j;
* Task for initializing shard position and invoking the RecordProcessor initialize() API. * Task for initializing shard position and invoking the RecordProcessor initialize() API.
*/ */
@Slf4j @Slf4j
class InitializeTask implements ITask { public class InitializeTask implements ITask {
private static final String RECORD_PROCESSOR_INITIALIZE_METRIC = "RecordProcessor.initialize"; private static final String RECORD_PROCESSOR_INITIALIZE_METRIC = "RecordProcessor.initialize";
private final ShardInfo shardInfo; private final ShardInfo shardInfo;

View file

@ -12,20 +12,23 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import com.amazonaws.services.cloudwatch.model.StandardUnit; import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ThrottlingReporter;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.IKinesisProxyExtended; import software.amazon.kinesis.retrieval.IKinesisProxyExtended;
import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.kpl.UserRecord; import software.amazon.kinesis.retrieval.kpl.UserRecord;
import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.IMetricsScope; import software.amazon.kinesis.metrics.IMetricsScope;
@ -41,7 +44,7 @@ import lombok.extern.slf4j.Slf4j;
* Task for fetching data records and invoking processRecords() on the record processor instance. * Task for fetching data records and invoking processRecords() on the record processor instance.
*/ */
@Slf4j @Slf4j
class ProcessTask implements ITask { public class ProcessTask implements ITask {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import java.util.Optional; import java.util.Optional;
@ -21,6 +21,13 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
@ -42,7 +49,7 @@ import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy;
* A new instance should be created if the primary responsibility is reassigned back to this process. * A new instance should be created if the primary responsibility is reassigned back to this process.
*/ */
@Slf4j @Slf4j
class ShardConsumer { public class ShardConsumer {
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
private final KinesisClientLibConfiguration config; private final KinesisClientLibConfiguration config;
@ -145,7 +152,7 @@ class ShardConsumer {
* @param config Kinesis library configuration * @param config Kinesis library configuration
*/ */
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShardConsumer(ShardInfo shardInfo, public ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig, StreamConfig streamConfig,
ICheckpoint checkpoint, ICheckpoint checkpoint,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
@ -246,7 +253,7 @@ class ShardConsumer {
* *
* @return true if a new process task was submitted, false otherwise * @return true if a new process task was submitted, false otherwise
*/ */
synchronized boolean consumeShard() { public synchronized boolean consumeShard() {
return checkAndSubmitNextTask(); return checkAndSubmitNextTask();
} }
@ -345,7 +352,7 @@ class ShardConsumer {
* *
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/ */
void notifyShutdownRequested(ShutdownNotification shutdownNotification) { public void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
this.shutdownNotification = shutdownNotification; this.shutdownNotification = shutdownNotification;
markForShutdown(ShutdownReason.REQUESTED); markForShutdown(ShutdownReason.REQUESTED);
} }
@ -356,7 +363,7 @@ class ShardConsumer {
* *
* @return true if shutdown is complete (false if shutdown is still in progress) * @return true if shutdown is complete (false if shutdown is still in progress)
*/ */
synchronized boolean beginShutdown() { public synchronized boolean beginShutdown() {
markForShutdown(ShutdownReason.ZOMBIE); markForShutdown(ShutdownReason.ZOMBIE);
checkAndSubmitNextTask(); checkAndSubmitNextTask();
@ -376,14 +383,14 @@ class ShardConsumer {
* *
* @return true if shutdown is complete * @return true if shutdown is complete
*/ */
boolean isShutdown() { public boolean isShutdown() {
return currentState.isTerminal(); return currentState.isTerminal();
} }
/** /**
* @return the shutdownReason * @return the shutdownReason
*/ */
ShutdownReason getShutdownReason() { public ShutdownReason getShutdownReason() {
return shutdownReason; return shutdownReason;
} }
@ -430,7 +437,7 @@ class ShardConsumer {
} }
@VisibleForTesting @VisibleForTesting
boolean isShutdownRequested() { public boolean isShutdownRequested() {
return shutdownReason != null; return shutdownReason != null;
} }

View file

@ -15,7 +15,6 @@
package software.amazon.kinesis.lifecycle; package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
/** /**

View file

@ -12,8 +12,10 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.v2.IShutdownNotificationAware;
@ -21,7 +23,7 @@ import software.amazon.kinesis.processor.v2.IShutdownNotificationAware;
/** /**
* Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint. * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint.
*/ */
class ShutdownNotificationTask implements ITask { public class ShutdownNotificationTask implements ITask {
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
private final IRecordProcessorCheckpointer recordProcessorCheckpointer; private final IRecordProcessorCheckpointer recordProcessorCheckpointer;

View file

@ -12,13 +12,12 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; import static software.amazon.kinesis.lifecycle.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; import static software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState;
/** /**

View file

@ -12,13 +12,16 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.MetricsHelper;
@ -31,7 +34,7 @@ import lombok.extern.slf4j.Slf4j;
* Task for invoking the RecordProcessor shutdown() callback. * Task for invoking the RecordProcessor shutdown() callback.
*/ */
@Slf4j @Slf4j
class ShutdownTask implements ITask { public class ShutdownTask implements ITask {
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
@ -163,7 +166,7 @@ class ShutdownTask implements ITask {
} }
@VisibleForTesting @VisibleForTesting
ShutdownReason getReason() { public ShutdownReason getReason() {
return reason; return reason;
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -12,13 +12,13 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
/** /**
* Used to capture information from a task that we want to communicate back to the higher layer. * Used to capture information from a task that we want to communicate back to the higher layer.
* E.g. exception thrown when executing the task, if we reach end of a shard. * E.g. exception thrown when executing the task, if we reach end of a shard.
*/ */
class TaskResult { public class TaskResult {
// Did we reach the end of the shard while processing this task. // Did we reach the end of the shard while processing this task.
private boolean shardEndReached; private boolean shardEndReached;
@ -29,7 +29,7 @@ class TaskResult {
/** /**
* @return the shardEndReached * @return the shardEndReached
*/ */
protected boolean isShardEndReached() { public boolean isShardEndReached() {
return shardEndReached; return shardEndReached;
} }
@ -50,7 +50,7 @@ class TaskResult {
/** /**
* @param e Any exception encountered when running the process task. * @param e Any exception encountered when running the process task.
*/ */
TaskResult(Exception e) { public TaskResult(Exception e) {
this(e, false); this(e, false);
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
/** /**
* Enumerates types of tasks executed as part of processing a shard. * Enumerates types of tasks executed as part of processing a shard.

View file

@ -17,7 +17,7 @@ package software.amazon.kinesis.processor;
import java.util.List; import java.util.List;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
/** /**
* The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon * The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon

View file

@ -17,6 +17,7 @@ package software.amazon.kinesis.processor.v2;
import software.amazon.kinesis.lifecycle.InitializationInput; import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownInput; import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
/** /**
* The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon * The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon
@ -50,7 +51,7 @@ public interface IRecordProcessor {
* <h2><b>Warning</b></h2> * <h2><b>Warning</b></h2>
* *
* When the value of {@link ShutdownInput#getShutdownReason()} is * When the value of {@link ShutdownInput#getShutdownReason()} is
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you * {@link ShutdownReason#TERMINATE} it is required that you
* checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
* *
* @param shutdownInput * @param shutdownInput

View file

@ -36,6 +36,7 @@ import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode; import org.mockito.verification.VerificationMode;
import software.amazon.kinesis.lifecycle.ShardConsumer;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class GracefulShutdownCoordinatorTest { public class GracefulShutdownCoordinatorTest {

View file

@ -25,6 +25,7 @@ import com.amazonaws.services.kinesis.model.Shard;
import junit.framework.Assert; import junit.framework.Assert;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.lifecycle.ShutdownReason;
/** /**
* Helper class to verify shard lineage in unit tests that use TestStreamlet. * Helper class to verify shard lineage in unit tests that use TestStreamlet.

View file

@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.v2.IShutdownNotificationAware;
@ -40,7 +41,7 @@ import lombok.extern.slf4j.Slf4j;
* Streamlet that tracks records it's seen - useful for testing. * Streamlet that tracks records it's seen - useful for testing.
*/ */
@Slf4j @Slf4j
class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware { public class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware {
private List<Record> records = new ArrayList<Record>(); private List<Record> records = new ArrayList<Record>();
private Set<String> processedSeqNums = new HashSet<String>(); // used for deduping private Set<String> processedSeqNums = new HashSet<String>(); // used for deduping
@ -143,7 +144,7 @@ class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware {
/** /**
* @return the shutdownReason * @return the shutdownReason
*/ */
ShutdownReason getShutdownReason() { public ShutdownReason getShutdownReason() {
return shutdownReason; return shutdownReason;
} }
@ -154,7 +155,7 @@ class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware {
return numProcessRecordsCallsWithEmptyRecordList; return numProcessRecordsCallsWithEmptyRecordList;
} }
boolean isShutdownNotificationCalled() { public boolean isShutdownNotificationCalled() {
return shutdownNotificationCalled; return shutdownNotificationCalled;
} }

View file

@ -82,6 +82,15 @@ import org.mockito.stubbing.Answer;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.lifecycle.BlockOnParentShardTask;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.InitializeTask;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownNotificationTask;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.ShutdownTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;

View file

@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.types;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
import org.junit.Test; import org.junit.Test;
/** /**

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -20,6 +20,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -27,6 +28,9 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import software.amazon.kinesis.lifecycle.BlockOnParentShardTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;

View file

@ -12,10 +12,10 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; import static software.amazon.kinesis.lifecycle.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; import static software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -28,6 +28,12 @@ import java.lang.reflect.Field;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import org.hamcrest.Condition; import org.hamcrest.Condition;
import org.hamcrest.Description; import org.hamcrest.Description;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
@ -38,6 +44,17 @@ import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.lifecycle.BlockOnParentShardTask;
import software.amazon.kinesis.lifecycle.ConsumerStates;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.InitializeTask;
import software.amazon.kinesis.lifecycle.ProcessTask;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownNotificationTask;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.ShutdownTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -37,12 +37,21 @@ import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ThrottlingReporter;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import software.amazon.kinesis.lifecycle.ProcessTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.KinesisDataFetcher;
@ -75,7 +84,8 @@ public class ProcessTaskTest {
private @Mock private @Mock
KinesisDataFetcher mockDataFetcher; KinesisDataFetcher mockDataFetcher;
private @Mock IRecordProcessor mockRecordProcessor; private @Mock IRecordProcessor mockRecordProcessor;
private @Mock RecordProcessorCheckpointer mockCheckpointer; private @Mock
RecordProcessorCheckpointer mockCheckpointer;
@Mock @Mock
private ThrottlingReporter throttlingReporter; private ThrottlingReporter throttlingReporter;
@Mock @Mock

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
@ -50,6 +50,15 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TestStreamlet;
import org.hamcrest.Description; import org.hamcrest.Description;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher; import org.hamcrest.TypeSafeMatcher;
@ -59,6 +68,12 @@ import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.lifecycle.ConsumerStates;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.InitializeTask;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package software.amazon.kinesis.lifecycle;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -22,6 +22,11 @@ import static org.mockito.Mockito.when;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TestStreamlet;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -30,6 +35,10 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.ShutdownTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.IKinesisProxy;