Merge pull request #321 from pfifer/lifecycle-2.0

Shard Consumer Invocation Method Changes
This commit is contained in:
Sahil Palvia 2018-03-27 15:17:04 -07:00 committed by GitHub
commit 30f937e6d5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 1081 additions and 635 deletions

View file

@ -83,6 +83,11 @@
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View file

@ -15,13 +15,14 @@
package software.amazon.kinesis.leases;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.TaskCompletedListener;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import lombok.extern.slf4j.Slf4j;
/**
* This task syncs leases/activies with shards of the stream.
* It will create new leases/activites when it discovers new shards (e.g. setup/resharding).
@ -38,6 +39,8 @@ public class ShardSyncTask implements ITask {
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;
private TaskCompletedListener listener;
/**
* @param kinesisProxy Used to fetch information about the stream (e.g. shard list)
* @param leaseManager Used to fetch and create leases
@ -64,23 +67,26 @@ public class ShardSyncTask implements ITask {
*/
@Override
public TaskResult call() {
Exception exception = null;
try {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPosition,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}
} catch (Exception e) {
log.error("Caught exception while sync'ing Kinesis shards and leases", e);
exception = e;
}
Exception exception = null;
return new TaskResult(exception);
try {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}
} catch (Exception e) {
log.error("Caught exception while sync'ing Kinesis shards and leases", e);
exception = e;
}
return new TaskResult(exception);
} finally {
if (listener != null) {
listener.taskCompleted(this);
}
}
}
@ -92,4 +98,12 @@ public class ShardSyncTask implements ITask {
return taskType;
}
@Override
public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) {
if (listener != null) {
log.warn("Listener is being reset, this shouldn't happen");
}
listener = taskCompletedListener;
}
}

View file

@ -15,12 +15,12 @@
package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
* Task to block until processing of all data records in the parent shard(s) is completed.
@ -39,6 +39,8 @@ public class BlockOnParentShardTask implements ITask {
// Sleep for this duration if the parent shards have not completed processing, or we encounter an exception.
private final long parentShardPollIntervalMillis;
private TaskCompletedListener listener;
/**
* @param shardInfo Information about the shard we are working on
* @param leaseManager Used to fetch the lease and checkpoint info for parent shards
@ -57,43 +59,49 @@ public class BlockOnParentShardTask implements ITask {
*/
@Override
public TaskResult call() {
Exception exception = null;
try {
boolean blockedOnParentShard = false;
for (String shardId : shardInfo.getParentShardIds()) {
KinesisClientLease lease = leaseManager.getLease(shardId);
if (lease != null) {
ExtendedSequenceNumber checkpoint = lease.getCheckpoint();
if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) {
log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardId, checkpoint);
blockedOnParentShard = true;
exception = new BlockedOnParentShardException("Parent shard not yet done");
break;
Exception exception = null;
try {
boolean blockedOnParentShard = false;
for (String shardId : shardInfo.getParentShardIds()) {
KinesisClientLease lease = leaseManager.getLease(shardId);
if (lease != null) {
ExtendedSequenceNumber checkpoint = lease.getCheckpoint();
if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) {
log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardId, checkpoint);
blockedOnParentShard = true;
exception = new BlockedOnParentShardException("Parent shard not yet done");
break;
} else {
log.debug("Shard {} has been completely processed.", shardId);
}
} else {
log.debug("Shard {} has been completely processed.", shardId);
log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardId);
}
} else {
log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardId);
}
if (!blockedOnParentShard) {
log.info("No need to block on parents {} of shard {}", shardInfo.getParentShardIds(),
shardInfo.getShardId());
return new TaskResult(null);
}
} catch (Exception e) {
log.error("Caught exception when checking for parent shard checkpoint", e);
exception = e;
}
try {
Thread.sleep(parentShardPollIntervalMillis);
} catch (InterruptedException e) {
log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfo.getShardId(), e);
}
if (!blockedOnParentShard) {
log.info("No need to block on parents {} of shard {}", shardInfo.getParentShardIds(),
shardInfo.getShardId());
return new TaskResult(null);
return new TaskResult(exception);
} finally {
if (listener != null) {
listener.taskCompleted(this);
}
} catch (Exception e) {
log.error("Caught exception when checking for parent shard checkpoint", e);
exception = e;
}
try {
Thread.sleep(parentShardPollIntervalMillis);
} catch (InterruptedException e) {
log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfo.getShardId(), e);
}
return new TaskResult(exception);
}
/* (non-Javadoc)
@ -104,4 +112,12 @@ public class BlockOnParentShardTask implements ITask {
return taskType;
}
@Override
public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) {
if (listener != null) {
log.warn("Listener is being reset, this shouldn't happen");
}
listener = taskCompletedListener;
}
}

View file

@ -312,14 +312,13 @@ class ConsumerStates {
@Override
public ITask createTask(ShardConsumer consumer) {
ProcessTask.RecordsFetcher recordsFetcher = new ProcessTask.RecordsFetcher(consumer.getGetRecordsCache());
return new ProcessTask(consumer.getShardInfo(),
consumer.getStreamConfig(),
consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(),
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getGetRecordsCache());
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), recordsFetcher.getRecords());
}
@Override

View file

@ -14,9 +14,6 @@
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import java.util.concurrent.Callable;
/**
@ -38,4 +35,12 @@ public interface ITask extends Callable<TaskResult> {
*/
TaskType getTaskType();
/**
* Adds a listener that will be notified once the task is completed.
*
* @param taskCompletedListener
* the listener to call once the task has been completed
*/
void addTaskCompletedListener(TaskCompletedListener taskCompletedListener);
}

View file

@ -134,4 +134,9 @@ public class InitializeTask implements ITask {
return taskType;
}
@Override
public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) {
}
}

View file

@ -18,6 +18,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.List;
import lombok.AllArgsConstructor;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.model.Record;
@ -29,11 +30,14 @@ import software.amazon.kinesis.processor.IRecordProcessor;
* {@link IRecordProcessor#processRecords(
* ProcessRecordsInput processRecordsInput) processRecords} method.
*/
@AllArgsConstructor
public class ProcessRecordsInput {
@Getter
private Instant cacheEntryTime;
@Getter
private Instant cacheExitTime;
@Getter
private boolean isAtShardEnd;
private List<Record> records;
private IRecordProcessorCheckpointer checkpointer;
private Long millisBehindLatest;

View file

@ -1,16 +1,9 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License
* (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
* http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
@ -19,26 +12,24 @@ import java.util.List;
import java.util.ListIterator;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.IMetricsScope;
import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.IKinesisProxyExtended;
import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.kpl.UserRecord;
import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.IMetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import lombok.extern.slf4j.Slf4j;
/**
* Task for fetching data records and invoking processRecords() on the record processor instance.
@ -55,39 +46,31 @@ public class ProcessTask implements ITask {
private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final KinesisDataFetcher dataFetcher;
private final TaskType taskType = TaskType.PROCESS;
private final StreamConfig streamConfig;
private final long backoffTimeMillis;
private final Shard shard;
private final ThrottlingReporter throttlingReporter;
private final GetRecordsCache getRecordsCache;
private final ProcessRecordsInput processRecordsInput;
private TaskCompletedListener listener;
@RequiredArgsConstructor
public static class RecordsFetcher {
private final GetRecordsCache getRecordsCache;
public ProcessRecordsInput getRecords() {
ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
if (processRecordsInput.getMillisBehindLatest() != null) {
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
processRecordsInput.getMillisBehindLatest(), StandardUnit.Milliseconds, MetricsLevel.SUMMARY);
}
return processRecordsInput;
}
/**
* @param shardInfo
* contains information about the shard
* @param streamConfig
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
* @param getRecordsCache
* The retrieval strategy for fetching records from kinesis
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
GetRecordsCache getRecordsCache) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
getRecordsCache);
}
/**
@ -99,27 +82,44 @@ public class ProcessTask implements ITask {
* Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ProcessRecordsInput processRecordsInput) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), processRecordsInput);
}
/**
* @param shardInfo
* contains information about the shard
* @param streamConfig
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param backoffTimeMillis
* backoff time when catching exceptions
* @param throttlingReporter
* determines how throttling events should be reported in the log.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) {
RecordProcessorCheckpointer recordProcessorCheckpointer, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ThrottlingReporter throttlingReporter,
ProcessRecordsInput processRecordsInput) {
super();
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.dataFetcher = dataFetcher;
this.streamConfig = streamConfig;
this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
this.getRecordsCache = getRecordsCache;
this.processRecordsInput = processRecordsInput;
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
@ -138,55 +138,53 @@ public class ProcessTask implements ITask {
/*
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/
@Override
public TaskResult call() {
long startTimeMillis = System.currentTimeMillis();
IMetricsScope scope = MetricsHelper.getMetricsScope();
scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId());
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY);
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY);
Exception exception = null;
try {
if (dataFetcher.isShardEndReached()) {
log.info("Reached end of shard {}", shardInfo.getShardId());
return new TaskResult(null, true);
long startTimeMillis = System.currentTimeMillis();
IMetricsScope scope = MetricsHelper.getMetricsScope();
scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId());
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY);
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY);
Exception exception = null;
try {
if (processRecordsInput.isAtShardEnd()) {
log.info("Reached end of shard {}", shardInfo.getShardId());
return new TaskResult(null, true);
}
throttlingReporter.success();
List<Record> records = processRecordsInput.getRecords();
if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
} else {
handleNoRecords(startTimeMillis);
}
records = deaggregateRecords(records);
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(
scope, records, recordProcessorCheckpointer.getLastCheckpointValue(),
recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) {
callProcessRecords(processRecordsInput, records);
}
} catch (RuntimeException e) {
log.error("ShardId {}: Caught exception: ", shardInfo.getShardId(), e);
exception = e;
backoff();
}
final ProcessRecordsInput processRecordsInput = getRecordsResult();
throttlingReporter.success();
List<Record> records = processRecordsInput.getRecords();
if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
} else {
handleNoRecords(startTimeMillis);
return new TaskResult(exception);
} finally {
if (listener != null) {
listener.taskCompleted(this);
}
records = deaggregateRecords(records);
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(
filterAndGetMaxExtendedSequenceNumber(scope, records,
recordProcessorCheckpointer.getLastCheckpointValue(),
recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) {
callProcessRecords(processRecordsInput, records);
}
} catch (ProvisionedThroughputExceededException pte) {
throttlingReporter.throttled();
exception = pte;
backoff();
} catch (RuntimeException e) {
log.error("ShardId {}: Caught exception: ", shardInfo.getShardId(), e);
exception = e;
backoff();
}
return new TaskResult(exception);
}
/**
@ -213,8 +211,7 @@ public class ProcessTask implements ITask {
log.debug("Calling application processRecords() with {} records from {}", records.size(),
shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
.withCheckpointer(recordProcessorCheckpointer)
.withMillisBehindLatest(input.getMillisBehindLatest());
.withCheckpointer(recordProcessorCheckpointer).withMillisBehindLatest(input.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
@ -291,28 +288,37 @@ public class ProcessTask implements ITask {
return taskType;
}
@Override
public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) {
if (listener != null) {
log.warn("Listener is being reset, this shouldn't happen");
}
listener = taskCompletedListener;
}
/**
* Scans a list of records to filter out records up to and including the most recent checkpoint value and to get
* the greatest extended sequence number from the retained records. Also emits metrics about the records.
* Scans a list of records to filter out records up to and including the most recent checkpoint value and to get the
* greatest extended sequence number from the retained records. Also emits metrics about the records.
*
* @param scope metrics scope to emit metrics into
* @param records list of records to scan and change in-place as needed
* @param lastCheckpointValue the most recent checkpoint value
* @param lastLargestPermittedCheckpointValue previous largest permitted checkpoint value
* @param scope
* metrics scope to emit metrics into
* @param records
* list of records to scan and change in-place as needed
* @param lastCheckpointValue
* the most recent checkpoint value
* @param lastLargestPermittedCheckpointValue
* previous largest permitted checkpoint value
* @return the largest extended sequence number among the retained records
*/
private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<Record> records,
final ExtendedSequenceNumber lastCheckpointValue,
final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) {
final ExtendedSequenceNumber lastCheckpointValue,
final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) {
ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue;
ListIterator<Record> recordIterator = records.listIterator();
while (recordIterator.hasNext()) {
Record record = recordIterator.next();
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(
record.getSequenceNumber(),
record instanceof UserRecord
? ((UserRecord) record).getSubSequenceNumber()
: null);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(record.getSequenceNumber(),
record instanceof UserRecord ? ((UserRecord) record).getSubSequenceNumber() : null);
if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) {
recordIterator.remove();
@ -332,58 +338,4 @@ public class ProcessTask implements ITask {
return largestExtendedSequenceNumber;
}
/**
* Gets records from Kinesis and retries once in the event of an ExpiredIteratorException.
*
* @return list of data records from Kinesis
*/
private ProcessRecordsInput getRecordsResult() {
try {
return getRecordsResultAndRecordMillisBehindLatest();
} catch (ExpiredIteratorException e) {
// If we see a ExpiredIteratorException, try once to restart from the greatest remembered sequence number
log.info("ShardId {}"
+ ": getRecords threw ExpiredIteratorException - restarting after greatest seqNum "
+ "passed to customer", shardInfo.getShardId(), e);
MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count,
MetricsLevel.SUMMARY);
/*
* Advance the iterator to after the greatest processed sequence number (remembered by
* recordProcessorCheckpointer).
*/
dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue()
.getSequenceNumber(), streamConfig.getInitialPositionInStream());
// Try a second time - if we fail this time, expose the failure.
try {
return getRecordsResultAndRecordMillisBehindLatest();
} catch (ExpiredIteratorException ex) {
String msg =
"Shard " + shardInfo.getShardId()
+ ": getRecords threw ExpiredIteratorException with a fresh iterator.";
log.error(msg, ex);
throw ex;
}
}
}
/**
* Gets records from Kinesis and records the MillisBehindLatest metric if present.
*
* @return list of data records from Kinesis
*/
private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() {
final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
if (processRecordsInput.getMillisBehindLatest() != null) {
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
processRecordsInput.getMillisBehindLatest(),
StandardUnit.Milliseconds,
MetricsLevel.SUMMARY);
}
return processRecordsInput;
}
}

View file

@ -0,0 +1,32 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.lifecycle.events.LeaseLost;
import software.amazon.kinesis.lifecycle.events.RecordsReceived;
import software.amazon.kinesis.lifecycle.events.ShardCompleted;
import software.amazon.kinesis.lifecycle.events.ShutdownRequested;
import software.amazon.kinesis.lifecycle.events.Started;
public interface RecordProcessorLifecycle {
void started(Started started);
void recordsReceived(RecordsReceived records);
void leaseLost(LeaseLost leaseLost);
void shardCompleted(ShardCompleted shardCompleted);
void shutdownRequested(ShutdownRequested shutdownRequested);
}

View file

@ -0,0 +1,68 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import lombok.AllArgsConstructor;
import software.amazon.kinesis.lifecycle.events.LeaseLost;
import software.amazon.kinesis.lifecycle.events.RecordsReceived;
import software.amazon.kinesis.lifecycle.events.ShardCompleted;
import software.amazon.kinesis.lifecycle.events.ShutdownRequested;
import software.amazon.kinesis.lifecycle.events.Started;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.IShutdownNotificationAware;
@AllArgsConstructor
public class RecordProcessorShim implements RecordProcessorLifecycle {
private final IRecordProcessor delegate;
@Override
public void started(Started started) {
delegate.initialize(started.toInitializationInput());
}
@Override
public void recordsReceived(RecordsReceived records) {
delegate.processRecords(records.toProcessRecordsInput());
}
@Override
public void leaseLost(LeaseLost leaseLost) {
ShutdownInput shutdownInput = new ShutdownInput() {
@Override
public IRecordProcessorCheckpointer getCheckpointer() {
throw new UnsupportedOperationException("Cannot checkpoint when the lease is lost");
}
}.withShutdownReason(ShutdownReason.ZOMBIE);
delegate.shutdown(shutdownInput);
}
@Override
public void shardCompleted(ShardCompleted shardCompleted) {
ShutdownInput shutdownInput = new ShutdownInput().withCheckpointer(shardCompleted.getCheckpointer())
.withShutdownReason(ShutdownReason.TERMINATE);
delegate.shutdown(shutdownInput);
}
@Override
public void shutdownRequested(ShutdownRequested shutdownRequested) {
if (delegate instanceof IShutdownNotificationAware) {
IShutdownNotificationAware aware = (IShutdownNotificationAware)delegate;
aware.shutdownRequested(shutdownRequested.getCheckpointer());
}
}
}

View file

@ -15,27 +15,31 @@
package software.amazon.kinesis.lifecycle;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.metrics.IMetricsFactory;
import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.IMetricsFactory;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.AsynchronousGetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -49,6 +53,7 @@ import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy;
*/
@Slf4j
public class ShardConsumer {
//<editor-fold desc="Class Variables">
private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor;
private final KinesisClientLibConfiguration config;
@ -68,7 +73,11 @@ public class ShardConsumer {
private ITask currentTask;
private long currentTaskSubmitTime;
private Future<TaskResult> future;
private boolean started = false;
private Instant taskDispatchedAt;
//</editor-fold>
//<editor-fold desc="Cache Management">
@Getter
private final GetRecordsCache getRecordsCache;
@ -82,6 +91,7 @@ public class ShardConsumer {
return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
}
//</editor-fold>
/*
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
@ -95,6 +105,7 @@ public class ShardConsumer {
private volatile ShutdownReason shutdownReason;
private volatile ShutdownNotification shutdownNotification;
//<editor-fold desc="Constructors">
/**
* @param shardInfo Shard information
* @param streamConfig Stream configuration to use
@ -245,22 +256,49 @@ public class ShardConsumer {
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
}
//</editor-fold>
//<editor-fold desc="Dispatch">
private void start() {
started = true;
getRecordsCache.addDataArrivedListener(this::checkAndSubmitNextTask);
checkAndSubmitNextTask();
}
/**
* No-op if current task is pending, otherwise submits next task for this shard.
* This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state.
*
* @return true if a new process task was submitted, false otherwise
*/
public synchronized boolean consumeShard() {
return checkAndSubmitNextTask();
@Synchronized
public boolean consumeShard() {
if (!started) {
start();
}
if (taskDispatchedAt != null) {
Duration taken = Duration.between(taskDispatchedAt, Instant.now());
String commonMessage = String.format("Previous %s task still pending for shard %s since %s ago. ",
currentTask.getTaskType(), shardInfo.getShardId(), taken);
if (log.isDebugEnabled()) {
log.debug("{} Not submitting new task.", commonMessage);
}
config.getLogWarningForTaskAfterMillis().ifPresent(value -> {
if (taken.toMillis() > value) {
log.warn(commonMessage);
}
});
}
return true;
}
private boolean readyForNextTask() {
return future == null || future.isCancelled() || future.isDone();
}
private synchronized boolean checkAndSubmitNextTask() {
@Synchronized
private boolean checkAndSubmitNextTask() {
boolean submittedNewTask = false;
if (readyForNextTask()) {
TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE;
@ -271,9 +309,11 @@ public class ShardConsumer {
updateState(taskOutcome);
ITask nextTask = getNextTask();
if (nextTask != null) {
nextTask.addTaskCompletedListener(this::handleTaskCompleted);
currentTask = nextTask;
try {
future = executorService.submit(currentTask);
taskDispatchedAt = Instant.now();
currentTaskSubmitTime = System.currentTimeMillis();
submittedNewTask = true;
log.debug("Submitted new {} task for shard {}", currentTask.getTaskType(), shardInfo.getShardId());
@ -306,6 +346,33 @@ public class ShardConsumer {
return submittedNewTask;
}
private boolean shouldDispatchNextTask() {
return !isShutdown() || shutdownReason != null || getRecordsCache.hasResultAvailable();
}
@Synchronized
private void handleTaskCompleted(ITask task) {
if (future != null) {
executorService.submit(() -> {
//
// Determine task outcome will wait on the future for us. The value of the future
//
resolveFuture();
if (shouldDispatchNextTask()) {
checkAndSubmitNextTask();
}
});
} else {
log.error("Future wasn't set. This shouldn't happen as polling should be disabled. " +
"Will trigger next task check just in case");
if (shouldDispatchNextTask()) {
checkAndSubmitNextTask();
}
}
}
public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() {
return skipShardSyncAtWorkerInitializationIfLeasesExist;
}
@ -314,6 +381,14 @@ public class ShardConsumer {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE
}
private void resolveFuture() {
try {
future.get();
} catch (Exception e) {
log.info("Ignoring caught exception '{}' exception during resolve.", e.getMessage());
}
}
private TaskOutcome determineTaskOutcome() {
try {
TaskResult result = future.get();
@ -345,54 +420,6 @@ public class ShardConsumer {
}
}
/**
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
* before being shutdown.
*
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/
public void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
this.shutdownNotification = shutdownNotification;
markForShutdown(ShutdownReason.REQUESTED);
}
/**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
* This is called by Worker when it loses responsibility for a shard.
*
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
public synchronized boolean beginShutdown() {
markForShutdown(ShutdownReason.ZOMBIE);
checkAndSubmitNextTask();
return isShutdown();
}
synchronized void markForShutdown(ShutdownReason reason) {
// ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard)
if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) {
shutdownReason = reason;
}
}
/**
* Used (by Worker) to check if this ShardConsumer instance has been shutdown
* RecordProcessor shutdown() has been invoked, as appropriate.
*
* @return true if shutdown is complete
*/
public boolean isShutdown() {
return currentState.isTerminal();
}
/**
* @return the shutdownReason
*/
public ShutdownReason getShutdownReason() {
return shutdownReason;
}
/**
* Figure out next task to run based on current state, task, and shutdown context.
*
@ -435,11 +462,66 @@ public class ShardConsumer {
}
//</editor-fold>
//<editor-fold desc="Shutdown">
/**
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
* before being shutdown.
*
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/
public void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
this.shutdownNotification = shutdownNotification;
markForShutdown(ShutdownReason.REQUESTED);
}
/**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
* This is called by Worker when it loses responsibility for a shard.
*
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
@Synchronized
public boolean beginShutdown() {
markForShutdown(ShutdownReason.ZOMBIE);
checkAndSubmitNextTask();
return isShutdown();
}
synchronized void markForShutdown(ShutdownReason reason) {
// ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard)
if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) {
shutdownReason = reason;
}
}
/**
* Used (by Worker) to check if this ShardConsumer instance has been shutdown
* RecordProcessor shutdown() has been invoked, as appropriate.
*
* @return true if shutdown is complete
*/
public boolean isShutdown() {
return currentState.isTerminal();
}
/**
* @return the shutdownReason
*/
public ShutdownReason getShutdownReason() {
return shutdownReason;
}
@VisibleForTesting
public boolean isShutdownRequested() {
return shutdownReason != null;
}
//</editor-fold>
//<editor-fold desc="State Creation Accessors">
/**
* Private/Internal method - has package level access solely for testing purposes.
*
@ -504,4 +586,6 @@ public class ShardConsumer {
ShutdownNotification getShutdownNotification() {
return shutdownNotification;
}
//</editor-fold>
}

View file

@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.lifecycle;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.IRecordProcessor;
@ -22,12 +23,14 @@ import software.amazon.kinesis.processor.IShutdownNotificationAware;
/**
* Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint.
*/
@Slf4j
public class ShutdownNotificationTask implements ITask {
private final IRecordProcessor recordProcessor;
private final IRecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownNotification shutdownNotification;
private final ShardInfo shardInfo;
private TaskCompletedListener listener;
ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) {
this.recordProcessor = recordProcessor;
@ -57,4 +60,12 @@ public class ShutdownNotificationTask implements ITask {
public TaskType getTaskType() {
return TaskType.SHUTDOWN_NOTIFICATION;
}
@Override
public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) {
if (listener != null) {
log.warn("Listener is being reset, this shouldn't happen");
}
listener = taskCompletedListener;
}
}

View file

@ -49,6 +49,7 @@ public class ShutdownTask implements ITask {
private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis;
private final GetRecordsCache getRecordsCache;
private TaskCompletedListener listener;
/**
* Constructor.
@ -86,73 +87,79 @@ public class ShutdownTask implements ITask {
*/
@Override
public TaskResult call() {
Exception exception;
boolean applicationException = false;
try {
// If we reached end of the shard, set sequence number to SHARD_END.
if (reason == ShutdownReason.TERMINATE) {
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
}
Exception exception;
boolean applicationException = false;
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
shardInfo.getShardId(), shardInfo.getConcurrencyToken(), reason);
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(reason)
.withCheckpointer(recordProcessorCheckpointer);
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
recordProcessor.shutdown(shutdownInput);
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
// If we reached end of the shard, set sequence number to SHARD_END.
if (reason == ShutdownReason.TERMINATE) {
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
}
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
shardInfo.getShardId(), shardInfo.getConcurrencyToken(), reason);
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(reason)
.withCheckpointer(recordProcessorCheckpointer);
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
recordProcessor.shutdown(shutdownInput);
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
if (reason == ShutdownReason.TERMINATE) {
if ((lastCheckpointValue == null)
|| (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId());
}
}
log.debug("Shutting down retrieval strategy.");
getRecordsCache.shutdown();
log.debug("Record processor completed shutdown() for shard {}", shardInfo.getShardId());
} catch (Exception e) {
applicationException = true;
throw e;
} finally {
MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis,
MetricsLevel.SUMMARY);
}
if (reason == ShutdownReason.TERMINATE) {
if ((lastCheckpointValue == null)
|| (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId());
}
log.debug("Looking for child shards of shard {}", shardInfo.getShardId());
// create leases for the child shards
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPositionInStream,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards);
log.debug("Finished checking for child shards of shard {}", shardInfo.getShardId());
}
log.debug("Shutting down retrieval strategy.");
getRecordsCache.shutdown();
log.debug("Record processor completed shutdown() for shard {}", shardInfo.getShardId());
return new TaskResult(null);
} catch (Exception e) {
applicationException = true;
throw e;
} finally {
MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis,
MetricsLevel.SUMMARY);
if (applicationException) {
log.error("Application exception. ", e);
} else {
log.error("Caught exception: ", e);
}
exception = e;
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
log.debug("Interrupted sleep", ie);
}
}
if (reason == ShutdownReason.TERMINATE) {
log.debug("Looking for child shards of shard {}", shardInfo.getShardId());
// create leases for the child shards
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPositionInStream,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards);
log.debug("Finished checking for child shards of shard {}", shardInfo.getShardId());
}
return new TaskResult(null);
} catch (Exception e) {
if (applicationException) {
log.error("Application exception. ", e);
} else {
log.error("Caught exception: ", e);
}
exception = e;
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
log.debug("Interrupted sleep", ie);
return new TaskResult(exception);
} finally {
if (listener != null) {
listener.taskCompleted(this);
}
}
return new TaskResult(exception);
}
/*
@ -165,6 +172,14 @@ public class ShutdownTask implements ITask {
return taskType;
}
@Override
public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) {
if (listener != null) {
log.warn("Listener is being reset, this shouldn't happen");
}
listener = taskCompletedListener;
}
@VisibleForTesting
public ShutdownReason getReason() {
return reason;

View file

@ -0,0 +1,25 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
public interface TaskCompletedListener {
/**
* Called once a task has completed
*
* @param task
* the task that completed
*/
void taskCompleted(ITask task);
}

View file

@ -0,0 +1,22 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import lombok.Data;
@Data
public class TaskFailed {
private final Throwable throwable;
}

View file

@ -0,0 +1,20 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
@FunctionalInterface
public interface TaskFailedListener {
TaskFailureHandling taskFailed(TaskFailed result);
}

View file

@ -0,0 +1,19 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
public enum TaskFailureHandling {
STOP, CONTINUE
}

View file

@ -0,0 +1,18 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
public class LeaseLost {
}

View file

@ -0,0 +1,42 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import com.amazonaws.services.kinesis.model.Record;
import lombok.Data;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
@Data
public class RecordsReceived {
private final Instant cacheEntryTime;
private final Instant cacheExitTime;
private final boolean isAtShardEnd;
private final List<Record> records;
private final IRecordProcessorCheckpointer checkpointer;
private Duration timeBehindLatest;
public ProcessRecordsInput toProcessRecordsInput() {
return new ProcessRecordsInput(cacheEntryTime, cacheExitTime, isAtShardEnd, records, checkpointer,
timeBehindLatest.toMillis());
}
}

View file

@ -0,0 +1,23 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
import lombok.Data;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
@Data
public class ShardCompleted {
private final IRecordProcessorCheckpointer checkpointer;
}

View file

@ -0,0 +1,23 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
import lombok.Data;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
@Data
public class ShutdownRequested {
private final IRecordProcessorCheckpointer checkpointer;
}

View file

@ -0,0 +1,33 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
import lombok.Data;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Data
public class Started {
private final String shardId;
private final ExtendedSequenceNumber sequenceNumber;
private final ExtendedSequenceNumber pendingSequenceNumber;
public InitializationInput toInitializationInput() {
return new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(sequenceNumber)
.withExtendedSequenceNumber(sequenceNumber);
}
}

View file

@ -14,7 +14,9 @@
*/
package software.amazon.kinesis.metrics;
import lombok.RequiredArgsConstructor;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.TaskCompletedListener;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.metrics.MetricsHelper;
@ -28,6 +30,7 @@ public class MetricsCollectingTaskDecorator implements ITask {
private final ITask other;
private final IMetricsFactory factory;
private DelegateTaskCompletedListener delegateTaskCompletedListener;
/**
* Constructor.
@ -45,17 +48,23 @@ public class MetricsCollectingTaskDecorator implements ITask {
*/
@Override
public TaskResult call() {
MetricsHelper.startScope(factory, other.getClass().getSimpleName());
TaskResult result = null;
final long startTimeMillis = System.currentTimeMillis();
try {
result = other.call();
MetricsHelper.startScope(factory, other.getClass().getSimpleName());
TaskResult result = null;
final long startTimeMillis = System.currentTimeMillis();
try {
result = other.call();
} finally {
MetricsHelper.addSuccessAndLatency(startTimeMillis, result != null && result.getException() == null,
MetricsLevel.SUMMARY);
MetricsHelper.endScope();
}
return result;
} finally {
MetricsHelper.addSuccessAndLatency(startTimeMillis, result != null && result.getException() == null,
MetricsLevel.SUMMARY);
MetricsHelper.endScope();
if (delegateTaskCompletedListener != null) {
delegateTaskCompletedListener.dispatchIfNeeded();
}
}
return result;
}
/**
@ -66,6 +75,31 @@ public class MetricsCollectingTaskDecorator implements ITask {
return other.getTaskType();
}
@Override
public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) {
delegateTaskCompletedListener = new DelegateTaskCompletedListener(taskCompletedListener);
other.addTaskCompletedListener(delegateTaskCompletedListener);
}
@RequiredArgsConstructor
private class DelegateTaskCompletedListener implements TaskCompletedListener {
private final TaskCompletedListener delegate;
private boolean taskCompleted = false;
@Override
public void taskCompleted(ITask task) {
delegate.taskCompleted(task);
taskCompleted = true;
}
public void dispatchIfNeeded() {
if (!taskCompleted) {
delegate.taskCompleted(MetricsCollectingTaskDecorator.this);
}
}
}
@Override
public String toString() {
return this.getClass().getName() + "<" + other.getTaskType() + ">(" + other + ")";

View file

@ -56,4 +56,14 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
public void shutdown() {
getRecordsRetrievalStrategy.shutdown();
}
@Override
public void addDataArrivedListener(DataArrivedListener dataArrivedListener) {
}
@Override
public boolean hasResultAvailable() {
return true;
}
}

View file

@ -0,0 +1,20 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval;
@FunctionalInterface
public interface DataArrivedListener {
void dataArrived();
}

View file

@ -40,4 +40,8 @@ public interface GetRecordsCache {
* This method calls the shutdown behavior on the cache, if available.
*/
void shutdown();
void addDataArrivedListener(DataArrivedListener dataArrivedListener);
boolean hasResultAvailable();
}

View file

@ -62,6 +62,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private final String operation;
private final KinesisDataFetcher dataFetcher;
private final String shardId;
private DataArrivedListener dataArrivedListener;
/**
* Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a
@ -147,6 +148,19 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
started = false;
}
@Override
public void addDataArrivedListener(DataArrivedListener dataArrivedListener) {
if (dataArrivedListener != null) {
log.warn("Attempting to reset the data arrived listener for {}. This shouldn't happen", shardId);
}
this.dataArrivedListener = dataArrivedListener;
}
@Override
public boolean hasResultAvailable() {
return !getRecordsResultQueue.isEmpty();
}
private class DefaultGetRecordsCacheDaemon implements Runnable {
volatile boolean isShutdown = false;
@ -169,6 +183,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
.withCacheEntryTime(lastSuccessfulCall);
getRecordsResultQueue.put(processRecordsInput);
prefetchCounters.added(processRecordsInput);
dataArrivedListener.dataArrived();
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache.");
} catch (ExpiredIteratorException e) {

View file

@ -32,20 +32,15 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId,
IMetricsFactory metricsFactory, int maxRecords) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else {
return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy,
Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("prefetch-cache-" + shardId + "-%04d")
.build()),
idleMillisBetweenCalls,
metricsFactory,
"ProcessTask",
shardId);
}
return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy,
Executors
.newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("prefetch-cache-" + shardId + "-%04d").build()),
idleMillisBetweenCalls, metricsFactory, "ProcessTask", shardId);
}
@Override

View file

@ -51,6 +51,6 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
@Override
public GetRecordsCache createGetRecordsCache(@NonNull final ShardInfo shardInfo) {
return new BlockingGetRecordsCache(maxRecords, createGetRecordsRetrievalStrategy(shardInfo));
throw new UnsupportedOperationException();
}
}

View file

@ -71,6 +71,7 @@ import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
@ -1561,6 +1562,7 @@ public class WorkerTest {
}
@Test
@Ignore
public void testWorkerStateChangeListenerGoesThroughStates() throws Exception {
final CountDownLatch workerInitialized = new CountDownLatch(1);

View file

@ -1,26 +1,22 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License
* (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
* http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -29,7 +25,6 @@ import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@ -37,34 +32,47 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.model.Record;
import com.google.protobuf.ByteString;
import lombok.Data;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.kpl.Messages;
import software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord;
import software.amazon.kinesis.retrieval.kpl.UserRecord;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.google.protobuf.ByteString;
@RunWith(MockitoJUnitRunner.class)
public class ProcessTaskTest {
private StreamConfig config;
private ShardInfo shardInfo;
@Mock
private ProcessRecordsInput processRecordsInput;
@SuppressWarnings("serial")
private static class RecordSubclass extends Record {}
private static class RecordSubclass extends Record {
}
private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 };
@ -75,78 +83,45 @@ public class ProcessTaskTest {
private final boolean callProcessRecordsForEmptyRecordList = true;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.LATEST);
private @Mock
KinesisDataFetcher mockDataFetcher;
private @Mock IRecordProcessor mockRecordProcessor;
private @Mock
RecordProcessorCheckpointer mockCheckpointer;
@Mock
private KinesisDataFetcher mockDataFetcher;
@Mock
private IRecordProcessor mockRecordProcessor;
@Mock
private RecordProcessorCheckpointer mockCheckpointer;
@Mock
private ThrottlingReporter throttlingReporter;
@Mock
private GetRecordsCache getRecordsCache;
private List<Record> processedRecords;
private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
private ProcessTask processTask;
@Before
public void setUpProcessTask() {
// Initialize the annotation
MockitoAnnotations.initMocks(this);
// Set up process task
final StreamConfig config =
new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
processTask = new ProcessTask(
shardInfo,
config,
mockRecordProcessor,
mockCheckpointer,
mockDataFetcher,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
throttlingReporter,
getRecordsCache);
config = new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
shardInfo = new ShardInfo(shardId, null, null, null);
}
@Test
public void testProcessTaskWithProvisionedThroughputExceededException() {
// Set data fetcher to throw exception
doReturn(false).when(mockDataFetcher).isShardEndReached();
doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(getRecordsCache)
.getNextResult();
TaskResult result = processTask.call();
verify(throttlingReporter).throttled();
verify(throttlingReporter, never()).success();
verify(getRecordsCache).getNextResult();
assertTrue("Result should contain ProvisionedThroughputExceededException",
result.getException() instanceof ProvisionedThroughputExceededException);
}
@Test
public void testProcessTaskWithNonExistentStream() {
// Data fetcher returns a null Result ` the stream does not exist
doReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest((long) 0)).when(getRecordsCache).getNextResult();
TaskResult result = processTask.call();
verify(getRecordsCache).getNextResult();
assertNull("Task should not throw an exception", result.getException());
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput) {
return new ProcessTask(shardInfo, config, mockRecordProcessor, mockCheckpointer, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter,
processRecordsInput);
}
@Test
public void testProcessTaskWithShardEndReached() {
// Set data fetcher to return true for shard end reached
doReturn(true).when(mockDataFetcher).isShardEndReached();
processTask = makeProcessTask(processRecordsInput);
when(processRecordsInput.isAtShardEnd()).thenReturn(true);
TaskResult result = processTask.call();
assertTrue("Result should contain shardEndReached true", result.isShardEndReached());
assertThat(result, shardEndTaskResult(true));
}
@Test
@ -154,41 +129,42 @@ public class ProcessTaskTest {
final String sqn = new BigInteger(128, new Random()).toString();
final String pk = UUID.randomUUID().toString();
final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
final Record r = new Record()
.withPartitionKey(pk)
.withData(ByteBuffer.wrap(TEST_DATA))
.withSequenceNumber(sqn)
final Record r = new Record().withPartitionKey(pk).withData(ByteBuffer.wrap(TEST_DATA)).withSequenceNumber(sqn)
.withApproximateArrivalTimestamp(ts);
testWithRecord(r);
RecordProcessorOutcome outcome = testWithRecord(r);
assertEquals(1, processedRecords.size());
assertEquals(1, outcome.getProcessRecordsCall().getRecords().size());
Record pr = processedRecords.get(0);
Record pr = outcome.getProcessRecordsCall().getRecords().get(0);
assertEquals(pk, pr.getPartitionKey());
assertEquals(ts, pr.getApproximateArrivalTimestamp());
byte[] b = new byte[pr.getData().remaining()];
pr.getData().get(b);
assertTrue(Arrays.equals(TEST_DATA, b));
byte[] b = pr.getData().array();
assertThat(b, equalTo(TEST_DATA));
assertEquals(sqn, newLargestPermittedCheckpointValue.getSequenceNumber());
assertEquals(0, newLargestPermittedCheckpointValue.getSubSequenceNumber());
assertEquals(sqn, outcome.getCheckpointCall().getSequenceNumber());
assertEquals(0, outcome.getCheckpointCall().getSubSequenceNumber());
}
@Data
static class RecordProcessorOutcome {
final ProcessRecordsInput processRecordsCall;
final ExtendedSequenceNumber checkpointCall;
}
@Test
public void testDoesNotDeaggregateSubclassOfRecord() {
final String sqn = new BigInteger(128, new Random()).toString();
final Record r = new RecordSubclass()
.withSequenceNumber(sqn)
.withData(ByteBuffer.wrap(new byte[0]));
final Record r = new RecordSubclass().withSequenceNumber(sqn).withData(ByteBuffer.wrap(new byte[0]));
testWithRecord(r);
processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecord(r);
assertEquals(1, processedRecords.size(), 1);
assertSame(r, processedRecords.get(0));
assertEquals(1, outcome.getProcessRecordsCall().getRecords().size(), 1);
assertSame(r, outcome.getProcessRecordsCall().getRecords().get(0));
assertEquals(sqn, newLargestPermittedCheckpointValue.getSequenceNumber());
assertEquals(0, newLargestPermittedCheckpointValue.getSubSequenceNumber());
assertEquals(sqn, outcome.getCheckpointCall().getSequenceNumber());
assertEquals(0, outcome.getCheckpointCall().getSubSequenceNumber());
}
@Test
@ -196,44 +172,44 @@ public class ProcessTaskTest {
final String sqn = new BigInteger(128, new Random()).toString();
final String pk = UUID.randomUUID().toString();
final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
final Record r = new Record()
.withPartitionKey("-")
.withData(generateAggregatedRecord(pk))
.withSequenceNumber(sqn)
.withApproximateArrivalTimestamp(ts);
final Record r = new Record().withPartitionKey("-").withData(generateAggregatedRecord(pk))
.withSequenceNumber(sqn).withApproximateArrivalTimestamp(ts);
testWithRecord(r);
processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecord(r);
assertEquals(3, processedRecords.size());
for (Record pr : processedRecords) {
assertTrue(pr instanceof UserRecord);
List<Record> actualRecords = outcome.getProcessRecordsCall().getRecords();
assertEquals(3, actualRecords.size());
for (Record pr : actualRecords) {
assertThat(pr, instanceOf(UserRecord.class));
assertEquals(pk, pr.getPartitionKey());
assertEquals(ts, pr.getApproximateArrivalTimestamp());
byte[] b = new byte[pr.getData().remaining()];
pr.getData().get(b);
assertTrue(Arrays.equals(TEST_DATA, b));
byte[] b = pr.getData().array();
assertThat(b, equalTo(TEST_DATA));
}
assertEquals(sqn, newLargestPermittedCheckpointValue.getSequenceNumber());
assertEquals(processedRecords.size() - 1, newLargestPermittedCheckpointValue.getSubSequenceNumber());
assertEquals(sqn, outcome.getCheckpointCall().getSequenceNumber());
assertEquals(actualRecords.size() - 1, outcome.getCheckpointCall().getSubSequenceNumber());
}
@Test
public void testDeaggregatesRecordWithNoArrivalTimestamp() {
final String sqn = new BigInteger(128, new Random()).toString();
final String pk = UUID.randomUUID().toString();
final Record r = new Record()
.withPartitionKey("-")
.withData(generateAggregatedRecord(pk))
final Record r = new Record().withPartitionKey("-").withData(generateAggregatedRecord(pk))
.withSequenceNumber(sqn);
testWithRecord(r);
processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecord(r);
assertEquals(3, processedRecords.size());
for (Record pr : processedRecords) {
assertTrue(pr instanceof UserRecord);
List<Record> actualRecords = outcome.getProcessRecordsCall().getRecords();
assertEquals(3, actualRecords.size());
for (Record pr : actualRecords) {
assertThat(pr, instanceOf(UserRecord.class));
assertEquals(pk, pr.getPartitionKey());
assertNull(pr.getApproximateArrivalTimestamp());
assertThat(pr.getApproximateArrivalTimestamp(), nullValue());
}
}
@ -246,15 +222,17 @@ public class ProcessTaskTest {
final int numberOfRecords = 104;
// Start these batch of records's sequence number that is greater than previous checkpoint value.
final BigInteger startingSqn = previousCheckpointSqn.add(BigInteger.valueOf(10));
final List<Record> records = generateConsecutiveRecords(
numberOfRecords, "-", ByteBuffer.wrap(TEST_DATA), new Date(), startingSqn);
final List<Record> records = generateConsecutiveRecords(numberOfRecords, "-", ByteBuffer.wrap(TEST_DATA),
new Date(), startingSqn);
testWithRecords(records, new ExtendedSequenceNumber(previousCheckpointSqn.toString()),
processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecords(records,
new ExtendedSequenceNumber(previousCheckpointSqn.toString()),
new ExtendedSequenceNumber(previousCheckpointSqn.toString()));
final ExtendedSequenceNumber expectedLargestPermittedEsqn = new ExtendedSequenceNumber(
startingSqn.add(BigInteger.valueOf(numberOfRecords - 1)).toString());
assertEquals(expectedLargestPermittedEsqn, newLargestPermittedCheckpointValue);
assertEquals(expectedLargestPermittedEsqn, outcome.getCheckpointCall());
}
@Test
@ -265,17 +243,19 @@ public class ProcessTaskTest {
final ExtendedSequenceNumber largestPermittedEsqn = new ExtendedSequenceNumber(
baseSqn.add(BigInteger.valueOf(100)).toString());
testWithRecords(Collections.<Record>emptyList(), lastCheckpointEspn, largestPermittedEsqn);
processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecords(Collections.emptyList(), lastCheckpointEspn,
largestPermittedEsqn);
// Make sure that even with empty records, largest permitted sequence number does not change.
assertEquals(largestPermittedEsqn, newLargestPermittedCheckpointValue);
assertEquals(largestPermittedEsqn, outcome.getCheckpointCall());
}
@Test
public void testFilterBasedOnLastCheckpointValue() {
// Explanation of setup:
// * Assume in previous processRecord call, user got 3 sub-records that all belonged to one
// Kinesis record. So sequence number was X, and sub-sequence numbers were 0, 1, 2.
// Kinesis record. So sequence number was X, and sub-sequence numbers were 0, 1, 2.
// * 2nd sub-record was checkpointed (extended sequnce number X.1).
// * Worker crashed and restarted. So now DDB has checkpoint value of X.1.
// Test:
@ -286,21 +266,22 @@ public class ProcessTaskTest {
// Values for this processRecords call.
final String startingSqn = previousCheckpointSqn.toString();
final String pk = UUID.randomUUID().toString();
final Record r = new Record()
.withPartitionKey("-")
.withData(generateAggregatedRecord(pk))
final Record r = new Record().withPartitionKey("-").withData(generateAggregatedRecord(pk))
.withSequenceNumber(startingSqn);
testWithRecords(Collections.singletonList(r),
processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecords(Collections.singletonList(r),
new ExtendedSequenceNumber(previousCheckpointSqn.toString(), previousCheckpointSsqn),
new ExtendedSequenceNumber(previousCheckpointSqn.toString(), previousCheckpointSsqn));
List<Record> actualRecords = outcome.getProcessRecordsCall().getRecords();
// First two records should be dropped - and only 1 remaining records should be there.
assertEquals(1, processedRecords.size());
assertTrue(processedRecords.get(0) instanceof UserRecord);
assertEquals(1, actualRecords.size());
assertThat(actualRecords.get(0), instanceOf(UserRecord.class));
// Verify user record's extended sequence number and other fields.
final UserRecord pr = (UserRecord)processedRecords.get(0);
final UserRecord pr = (UserRecord) actualRecords.get(0);
assertEquals(pk, pr.getPartitionKey());
assertEquals(startingSqn, pr.getSequenceNumber());
assertEquals(previousCheckpointSsqn + 1, pr.getSubSequenceNumber());
@ -309,60 +290,50 @@ public class ProcessTaskTest {
// Expected largest permitted sequence number will be last sub-record sequence number.
final ExtendedSequenceNumber expectedLargestPermittedEsqn = new ExtendedSequenceNumber(
previousCheckpointSqn.toString(), 2L);
assertEquals(expectedLargestPermittedEsqn, newLargestPermittedCheckpointValue);
assertEquals(expectedLargestPermittedEsqn, outcome.getCheckpointCall());
}
private void testWithRecord(Record record) {
testWithRecords(Collections.singletonList(record),
ExtendedSequenceNumber.TRIM_HORIZON, ExtendedSequenceNumber.TRIM_HORIZON);
private RecordProcessorOutcome testWithRecord(Record record) {
return testWithRecords(Collections.singletonList(record), ExtendedSequenceNumber.TRIM_HORIZON,
ExtendedSequenceNumber.TRIM_HORIZON);
}
private void testWithRecords(List<Record> records,
ExtendedSequenceNumber lastCheckpointValue,
private RecordProcessorOutcome testWithRecords(List<Record> records, ExtendedSequenceNumber lastCheckpointValue,
ExtendedSequenceNumber largestPermittedCheckpointValue) {
when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(records).withMillisBehindLatest((long) 1000 * 50));
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
when(processRecordsInput.getRecords()).thenReturn(records);
processTask = makeProcessTask(processRecordsInput);
processTask.call();
verify(throttlingReporter).success();
verify(throttlingReporter, never()).throttled();
verify(getRecordsCache).getNextResult();
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(mockRecordProcessor).processRecords(priCaptor.capture());
processedRecords = priCaptor.getValue().getRecords();
ArgumentCaptor<ProcessRecordsInput> recordsCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(mockRecordProcessor).processRecords(recordsCaptor.capture());
ArgumentCaptor<ExtendedSequenceNumber> esnCaptor = ArgumentCaptor.forClass(ExtendedSequenceNumber.class);
verify(mockCheckpointer).setLargestPermittedCheckpointValue(esnCaptor.capture());
newLargestPermittedCheckpointValue = esnCaptor.getValue();
return new RecordProcessorOutcome(recordsCaptor.getValue(), esnCaptor.getValue());
}
/**
* See the KPL documentation on GitHub for more details about the binary
* format.
* See the KPL documentation on GitHub for more details about the binary format.
*
* @param pk
* Partition key to use. All the records will have the same
* partition key.
* @return ByteBuffer containing the serialized form of the aggregated
* record, along with the necessary header and footer.
* Partition key to use. All the records will have the same partition key.
* @return ByteBuffer containing the serialized form of the aggregated record, along with the necessary header and
* footer.
*/
private static ByteBuffer generateAggregatedRecord(String pk) {
ByteBuffer bb = ByteBuffer.allocate(1024);
bb.put(new byte[] {-13, -119, -102, -62 });
bb.put(new byte[] { -13, -119, -102, -62 });
Messages.Record r =
Messages.Record.newBuilder()
.setData(ByteString.copyFrom(TEST_DATA))
.setPartitionKeyIndex(0)
.build();
Messages.Record r = Messages.Record.newBuilder().setData(ByteString.copyFrom(TEST_DATA)).setPartitionKeyIndex(0)
.build();
byte[] payload = AggregatedRecord.newBuilder()
.addPartitionKeyTable(pk)
.addRecords(r)
.addRecords(r)
.addRecords(r)
.build()
.toByteArray();
byte[] payload = AggregatedRecord.newBuilder().addPartitionKeyTable(pk).addRecords(r).addRecords(r)
.addRecords(r).build().toByteArray();
bb.put(payload);
bb.put(md5(payload));
@ -371,16 +342,13 @@ public class ProcessTaskTest {
return bb;
}
private static List<Record> generateConsecutiveRecords(
int numberOfRecords, String partitionKey, ByteBuffer data,
private static List<Record> generateConsecutiveRecords(int numberOfRecords, String partitionKey, ByteBuffer data,
Date arrivalTimestamp, BigInteger startSequenceNumber) {
List<Record> records = new ArrayList<>();
for (int i = 0 ; i < numberOfRecords ; ++i) {
records.add(new Record()
.withPartitionKey(partitionKey)
.withData(data)
.withSequenceNumber(startSequenceNumber.add(BigInteger.valueOf(i)).toString())
.withApproximateArrivalTimestamp(arrivalTimestamp));
for (int i = 0; i < numberOfRecords; ++i) {
records.add(new Record().withPartitionKey(partitionKey).withData(data)
.withSequenceNumber(startSequenceNumber.add(BigInteger.valueOf(i)).toString())
.withApproximateArrivalTimestamp(arrivalTimestamp));
}
return records;
}
@ -393,4 +361,48 @@ public class ProcessTaskTest {
throw new RuntimeException(e);
}
}
private static TaskResultMatcher shardEndTaskResult(boolean isAtShardEnd) {
TaskResult expected = new TaskResult(null, isAtShardEnd);
return taskResult(expected);
}
private static TaskResultMatcher exceptionTaskResult(Exception ex) {
TaskResult expected = new TaskResult(ex, false);
return taskResult(expected);
}
private static TaskResultMatcher taskResult(TaskResult expected) {
return new TaskResultMatcher(expected);
}
private static class TaskResultMatcher extends TypeSafeDiagnosingMatcher<TaskResult> {
Matcher<TaskResult> matchers;
TaskResultMatcher(TaskResult expected) {
if (expected == null) {
matchers = nullValue(TaskResult.class);
} else {
matchers = allOf(notNullValue(TaskResult.class),
hasProperty("shardEndReached", equalTo(expected.isShardEndReached())),
hasProperty("exception", equalTo(expected.getException())));
}
}
@Override
protected boolean matchesSafely(TaskResult item, Description mismatchDescription) {
if (!matchers.matches(item)) {
matchers.describeMismatch(item, mismatchDescription);
return false;
}
return true;
}
@Override
public void describeTo(Description description) {
description.appendDescriptionOf(matchers);
}
}
}

View file

@ -112,12 +112,14 @@ public class ShardConsumerTest {
// Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is
// ... a non-final public class, and so can be mocked and spied.
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
private RecordsFetcherFactory recordsFetcherFactory;
private GetRecordsCache getRecordsCache;
private KinesisDataFetcher dataFetcher;
@Mock
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private IRecordProcessor processor;
@Mock
@ -136,7 +138,7 @@ public class ShardConsumerTest {
getRecordsCache = null;
dataFetcher = null;
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
//recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty());
}
@ -383,6 +385,8 @@ public class ShardConsumerTest {
Optional.empty(),
config);
consumer.consumeShard(); // check on parent shards
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);

View file

@ -1,85 +0,0 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import software.amazon.kinesis.retrieval.BlockingGetRecordsCache;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
/**
* Test class for the BlockingGetRecordsCache class.
*/
@RunWith(MockitoJUnitRunner.class)
public class BlockingGetRecordsCacheTest {
private static final int MAX_RECORDS_PER_COUNT = 10_000;
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Mock
private GetRecordsResult getRecordsResult;
private List<Record> records;
private BlockingGetRecordsCache blockingGetRecordsCache;
@Before
public void setup() {
records = new ArrayList<>();
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy);
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult);
when(getRecordsResult.getRecords()).thenReturn(records);
}
@Test
public void testGetNextRecordsWithNoRecords() {
ProcessRecordsInput result = blockingGetRecordsCache.getNextResult();
assertEquals(result.getRecords(), records);
assertNull(result.getCacheEntryTime());
assertNull(result.getCacheExitTime());
assertEquals(result.getTimeSpentInCache(), Duration.ZERO);
}
@Test
public void testGetNextRecordsWithRecords() {
Record record = new Record();
records.add(record);
records.add(record);
records.add(record);
records.add(record);
ProcessRecordsInput result = blockingGetRecordsCache.getNextResult();
assertEquals(result.getRecords(), records);
}
}

View file

@ -20,7 +20,7 @@
</encoder>
</appender>
<root level="INFO">
<root level="DEBUG">
<appender-ref ref="CONSOLE" />
</root>
</configuration>