Broke apart the process task, now diving into the shard consumer more.

This commit is contained in:
Pfifer, Justin 2018-03-26 11:47:44 -07:00
parent 992aa9dee1
commit 7b026f8a19
12 changed files with 580 additions and 370 deletions

View file

@ -312,14 +312,13 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
ProcessTask.RecordsFetcher recordsFetcher = new ProcessTask.RecordsFetcher(consumer.getGetRecordsCache());
return new ProcessTask(consumer.getShardInfo(), return new ProcessTask(consumer.getShardInfo(),
consumer.getStreamConfig(), consumer.getStreamConfig(),
consumer.getRecordProcessor(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(), consumer.getTaskBackoffTimeMillis(),
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), recordsFetcher.getRecords());
consumer.getGetRecordsCache());
} }
@Override @Override

View file

@ -18,6 +18,7 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import lombok.AllArgsConstructor;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
@ -29,11 +30,14 @@ import software.amazon.kinesis.processor.IRecordProcessor;
* {@link IRecordProcessor#processRecords( * {@link IRecordProcessor#processRecords(
* ProcessRecordsInput processRecordsInput) processRecords} method. * ProcessRecordsInput processRecordsInput) processRecords} method.
*/ */
@AllArgsConstructor
public class ProcessRecordsInput { public class ProcessRecordsInput {
@Getter @Getter
private Instant cacheEntryTime; private Instant cacheEntryTime;
@Getter @Getter
private Instant cacheExitTime; private Instant cacheExitTime;
@Getter
private boolean isAtShardEnd;
private List<Record> records; private List<Record> records;
private IRecordProcessorCheckpointer checkpointer; private IRecordProcessorCheckpointer checkpointer;
private Long millisBehindLatest; private Long millisBehindLatest;

View file

@ -1,16 +1,9 @@
/* /*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License
* * (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
* Licensed under the Amazon Software License (the "License"). * http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an "AS IS"
* You may not use this file except in compliance with the License. * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* A copy of the License is located at * language governing permissions and limitations under the License.
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/ */
package software.amazon.kinesis.lifecycle; package software.amazon.kinesis.lifecycle;
@ -19,26 +12,24 @@ import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import com.amazonaws.services.cloudwatch.model.StandardUnit; import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.coordinator.StreamConfig; import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.retrieval.ThrottlingReporter; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.IMetricsScope;
import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.IKinesisProxyExtended; import software.amazon.kinesis.retrieval.IKinesisProxyExtended;
import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.kpl.UserRecord; import software.amazon.kinesis.retrieval.kpl.UserRecord;
import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.IMetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import lombok.extern.slf4j.Slf4j;
/** /**
* Task for fetching data records and invoking processRecords() on the record processor instance. * Task for fetching data records and invoking processRecords() on the record processor instance.
@ -55,39 +46,50 @@ public class ProcessTask implements ITask {
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final KinesisDataFetcher dataFetcher;
private final TaskType taskType = TaskType.PROCESS; private final TaskType taskType = TaskType.PROCESS;
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final Shard shard; private final Shard shard;
private final ThrottlingReporter throttlingReporter; private final ThrottlingReporter throttlingReporter;
private final ProcessRecordsInput processRecordsInput;
@RequiredArgsConstructor
public static class RecordsFetcher {
private final GetRecordsCache getRecordsCache; private final GetRecordsCache getRecordsCache;
/** public ProcessRecordsInput getRecords() {
* @param shardInfo ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
* contains information about the shard
* @param streamConfig if (processRecordsInput.getMillisBehindLatest() != null) {
* Stream configuration MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
* @param recordProcessor processRecordsInput.getMillisBehindLatest(), StandardUnit.Milliseconds, MetricsLevel.SUMMARY);
* Record processor used to process the data records for the shard }
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress return processRecordsInput;
* @param dataFetcher }
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis }
* backoff time when catching exceptions
* @param getRecordsCache /**
* The retrieval strategy for fetching records from kinesis * @param shardInfo
*/ * contains information about the shard
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, * @param streamConfig
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, * Stream configuration
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, * @param recordProcessor
GetRecordsCache getRecordsCache) { * Record processor used to process the data records for the shard
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, * @param recordProcessorCheckpointer
skipShardSyncAtWorkerInitializationIfLeasesExist, * Passed to the RecordProcessor so it can checkpoint progress
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), * @param backoffTimeMillis
getRecordsCache); * backoff time when catching exceptions
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ProcessRecordsInput processRecordsInput) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), processRecordsInput);
} }
/** /**
@ -99,27 +101,24 @@ public class ProcessTask implements ITask {
* Record processor used to process the data records for the shard * Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer * @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress * Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis * @param backoffTimeMillis
* backoff time when catching exceptions * backoff time when catching exceptions
* @param throttlingReporter * @param throttlingReporter
* determines how throttling events should be reported in the log. * determines how throttling events should be reported in the log.
*/ */
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, RecordProcessorCheckpointer recordProcessorCheckpointer, long backoffTimeMillis,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ThrottlingReporter throttlingReporter,
ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) { ProcessRecordsInput processRecordsInput) {
super(); super();
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.dataFetcher = dataFetcher;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter; this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
this.getRecordsCache = getRecordsCache; this.processRecordsInput = processRecordsInput;
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
@ -138,7 +137,6 @@ public class ProcessTask implements ITask {
/* /*
* (non-Javadoc) * (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call() * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/ */
@Override @Override
@ -151,12 +149,11 @@ public class ProcessTask implements ITask {
Exception exception = null; Exception exception = null;
try { try {
if (dataFetcher.isShardEndReached()) { if (processRecordsInput.isAtShardEnd()) {
log.info("Reached end of shard {}", shardInfo.getShardId()); log.info("Reached end of shard {}", shardInfo.getShardId());
return new TaskResult(null, true); return new TaskResult(null, true);
} }
final ProcessRecordsInput processRecordsInput = getRecordsResult();
throttlingReporter.success(); throttlingReporter.success();
List<Record> records = processRecordsInput.getRecords(); List<Record> records = processRecordsInput.getRecords();
@ -167,19 +164,13 @@ public class ProcessTask implements ITask {
} }
records = deaggregateRecords(records); records = deaggregateRecords(records);
recordProcessorCheckpointer.setLargestPermittedCheckpointValue( recordProcessorCheckpointer.setLargestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(scope,
filterAndGetMaxExtendedSequenceNumber(scope, records, records, recordProcessorCheckpointer.getLastCheckpointValue(),
recordProcessorCheckpointer.getLastCheckpointValue(),
recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) { if (shouldCallProcessRecords(records)) {
callProcessRecords(processRecordsInput, records); callProcessRecords(processRecordsInput, records);
} }
} catch (ProvisionedThroughputExceededException pte) {
throttlingReporter.throttled();
exception = pte;
backoff();
} catch (RuntimeException e) { } catch (RuntimeException e) {
log.error("ShardId {}: Caught exception: ", shardInfo.getShardId(), e); log.error("ShardId {}: Caught exception: ", shardInfo.getShardId(), e);
exception = e; exception = e;
@ -213,8 +204,7 @@ public class ProcessTask implements ITask {
log.debug("Calling application processRecords() with {} records from {}", records.size(), log.debug("Calling application processRecords() with {} records from {}", records.size(),
shardInfo.getShardId()); shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
.withCheckpointer(recordProcessorCheckpointer) .withCheckpointer(recordProcessorCheckpointer).withMillisBehindLatest(input.getMillisBehindLatest());
.withMillisBehindLatest(input.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis(); final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try { try {
@ -292,13 +282,17 @@ public class ProcessTask implements ITask {
} }
/** /**
* Scans a list of records to filter out records up to and including the most recent checkpoint value and to get * Scans a list of records to filter out records up to and including the most recent checkpoint value and to get the
* the greatest extended sequence number from the retained records. Also emits metrics about the records. * greatest extended sequence number from the retained records. Also emits metrics about the records.
* *
* @param scope metrics scope to emit metrics into * @param scope
* @param records list of records to scan and change in-place as needed * metrics scope to emit metrics into
* @param lastCheckpointValue the most recent checkpoint value * @param records
* @param lastLargestPermittedCheckpointValue previous largest permitted checkpoint value * list of records to scan and change in-place as needed
* @param lastCheckpointValue
* the most recent checkpoint value
* @param lastLargestPermittedCheckpointValue
* previous largest permitted checkpoint value
* @return the largest extended sequence number among the retained records * @return the largest extended sequence number among the retained records
*/ */
private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<Record> records, private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<Record> records,
@ -308,11 +302,8 @@ public class ProcessTask implements ITask {
ListIterator<Record> recordIterator = records.listIterator(); ListIterator<Record> recordIterator = records.listIterator();
while (recordIterator.hasNext()) { while (recordIterator.hasNext()) {
Record record = recordIterator.next(); Record record = recordIterator.next();
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber( ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(record.getSequenceNumber(),
record.getSequenceNumber(), record instanceof UserRecord ? ((UserRecord) record).getSubSequenceNumber() : null);
record instanceof UserRecord
? ((UserRecord) record).getSubSequenceNumber()
: null);
if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) { if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) {
recordIterator.remove(); recordIterator.remove();
@ -332,58 +323,4 @@ public class ProcessTask implements ITask {
return largestExtendedSequenceNumber; return largestExtendedSequenceNumber;
} }
/**
* Gets records from Kinesis and retries once in the event of an ExpiredIteratorException.
*
* @return list of data records from Kinesis
*/
private ProcessRecordsInput getRecordsResult() {
try {
return getRecordsResultAndRecordMillisBehindLatest();
} catch (ExpiredIteratorException e) {
// If we see a ExpiredIteratorException, try once to restart from the greatest remembered sequence number
log.info("ShardId {}"
+ ": getRecords threw ExpiredIteratorException - restarting after greatest seqNum "
+ "passed to customer", shardInfo.getShardId(), e);
MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count,
MetricsLevel.SUMMARY);
/*
* Advance the iterator to after the greatest processed sequence number (remembered by
* recordProcessorCheckpointer).
*/
dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue()
.getSequenceNumber(), streamConfig.getInitialPositionInStream());
// Try a second time - if we fail this time, expose the failure.
try {
return getRecordsResultAndRecordMillisBehindLatest();
} catch (ExpiredIteratorException ex) {
String msg =
"Shard " + shardInfo.getShardId()
+ ": getRecords threw ExpiredIteratorException with a fresh iterator.";
log.error(msg, ex);
throw ex;
}
}
}
/**
* Gets records from Kinesis and records the MillisBehindLatest metric if present.
*
* @return list of data records from Kinesis
*/
private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() {
final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
if (processRecordsInput.getMillisBehindLatest() != null) {
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
processRecordsInput.getMillisBehindLatest(),
StandardUnit.Milliseconds,
MetricsLevel.SUMMARY);
}
return processRecordsInput;
}
} }

View file

@ -0,0 +1,32 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.lifecycle.events.LeaseLost;
import software.amazon.kinesis.lifecycle.events.RecordsReceived;
import software.amazon.kinesis.lifecycle.events.ShardCompleted;
import software.amazon.kinesis.lifecycle.events.ShutdownRequested;
import software.amazon.kinesis.lifecycle.events.Started;
public interface RecordProcessorLifecycle {
void started(Started started);
void recordsReceived(RecordsReceived records);
void leaseLost(LeaseLost leaseLost);
void shardCompleted(ShardCompleted shardCompletedInput);
void shutdownRequested(ShutdownRequested shutdownRequested);
}

View file

@ -0,0 +1,54 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import lombok.AllArgsConstructor;
import software.amazon.kinesis.lifecycle.events.LeaseLost;
import software.amazon.kinesis.lifecycle.events.ShardCompleted;
import software.amazon.kinesis.lifecycle.events.ShutdownRequested;
import software.amazon.kinesis.lifecycle.events.Started;
import software.amazon.kinesis.processor.IRecordProcessor;
@AllArgsConstructor
public class RecordProcessorShim implements RecordProcessorLifecycle {
private final IRecordProcessor delegate;
@Override
public void started(Started started) {
InitializationInput initializationInput = started.toInitializationInput();
delegate.initialize(initializationInput);
}
@Override
public void recordsReceived(ProcessRecordsInput records) {
}
@Override
public void leaseLost(LeaseLost leaseLost) {
}
@Override
public void shardCompleted(ShardCompleted shardCompletedInput) {
}
@Override
public void shutdownRequested(ShutdownRequested shutdownRequested) {
}
}

View file

@ -16,13 +16,20 @@ package software.amazon.kinesis.lifecycle;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.lifecycle.events.LeaseLost;
import software.amazon.kinesis.lifecycle.events.ShardCompleted;
import software.amazon.kinesis.lifecycle.events.ShutdownRequested;
import software.amazon.kinesis.lifecycle.events.Started;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -48,9 +55,11 @@ import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy;
* A new instance should be created if the primary responsibility is reassigned back to this process. * A new instance should be created if the primary responsibility is reassigned back to this process.
*/ */
@Slf4j @Slf4j
public class ShardConsumer { public class ShardConsumer implements RecordProcessorLifecycle {
//<editor-fold desc="Class Variables">
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
private RecordProcessorLifecycle recordProcessorLifecycle;
private final KinesisClientLibConfiguration config; private final KinesisClientLibConfiguration config;
private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final ExecutorService executorService; private final ExecutorService executorService;
@ -68,7 +77,9 @@ public class ShardConsumer {
private ITask currentTask; private ITask currentTask;
private long currentTaskSubmitTime; private long currentTaskSubmitTime;
private Future<TaskResult> future; private Future<TaskResult> future;
//</editor-fold>
//<editor-fold desc="Cache Management">
@Getter @Getter
private final GetRecordsCache getRecordsCache; private final GetRecordsCache getRecordsCache;
@ -82,6 +93,7 @@ public class ShardConsumer {
return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
} }
//</editor-fold>
/* /*
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
@ -95,6 +107,7 @@ public class ShardConsumer {
private volatile ShutdownReason shutdownReason; private volatile ShutdownReason shutdownReason;
private volatile ShutdownNotification shutdownNotification; private volatile ShutdownNotification shutdownNotification;
//<editor-fold desc="Constructors">
/** /**
* @param shardInfo Shard information * @param shardInfo Shard information
* @param streamConfig Stream configuration to use * @param streamConfig Stream configuration to use
@ -245,7 +258,9 @@ public class ShardConsumer {
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords()); this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
} }
//</editor-fold>
//<editor-fold desc="Dispatch">
/** /**
* No-op if current task is pending, otherwise submits next task for this shard. * No-op if current task is pending, otherwise submits next task for this shard.
* This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state. * This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state.
@ -345,6 +360,51 @@ public class ShardConsumer {
} }
} }
/**
* Figure out next task to run based on current state, task, and shutdown context.
*
* @return Return next task to run
*/
private ITask getNextTask() {
ITask nextTask = currentState.createTask(this);
if (nextTask == null) {
return null;
} else {
return new MetricsCollectingTaskDecorator(nextTask, metricsFactory);
}
}
/**
* Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info.
*
* @param taskOutcome The outcome of the last task
*/
void updateState(TaskOutcome taskOutcome) {
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
markForShutdown(ShutdownReason.TERMINATE);
}
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
currentState = currentState.shutdownTransition(shutdownReason);
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
if (currentState.getTaskType() == currentTask.getTaskType()) {
currentState = currentState.successTransition();
} else {
log.error("Current State task type of '{}' doesn't match the current tasks type of '{}'. This"
+ " shouldn't happen, and indicates a programming error. Unable to safely transition to the"
+ " next state.", currentState.getTaskType(), currentTask.getTaskType());
}
}
//
// Don't change state otherwise
//
}
//</editor-fold>
//<editor-fold desc="Shutdown">
/** /**
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint * Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
* before being shutdown. * before being shutdown.
@ -393,53 +453,14 @@ public class ShardConsumer {
return shutdownReason; return shutdownReason;
} }
/**
* Figure out next task to run based on current state, task, and shutdown context.
*
* @return Return next task to run
*/
private ITask getNextTask() {
ITask nextTask = currentState.createTask(this);
if (nextTask == null) {
return null;
} else {
return new MetricsCollectingTaskDecorator(nextTask, metricsFactory);
}
}
/**
* Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info.
*
* @param taskOutcome The outcome of the last task
*/
void updateState(TaskOutcome taskOutcome) {
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
markForShutdown(ShutdownReason.TERMINATE);
}
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
currentState = currentState.shutdownTransition(shutdownReason);
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
if (currentState.getTaskType() == currentTask.getTaskType()) {
currentState = currentState.successTransition();
} else {
log.error("Current State task type of '{}' doesn't match the current tasks type of '{}'. This"
+ " shouldn't happen, and indicates a programming error. Unable to safely transition to the"
+ " next state.", currentState.getTaskType(), currentTask.getTaskType());
}
}
//
// Don't change state otherwise
//
}
@VisibleForTesting @VisibleForTesting
public boolean isShutdownRequested() { public boolean isShutdownRequested() {
return shutdownReason != null; return shutdownReason != null;
} }
//</editor-fold>
//<editor-fold desc="State Creation Accessors">
/** /**
* Private/Internal method - has package level access solely for testing purposes. * Private/Internal method - has package level access solely for testing purposes.
* *
@ -504,4 +525,46 @@ public class ShardConsumer {
ShutdownNotification getShutdownNotification() { ShutdownNotification getShutdownNotification() {
return shutdownNotification; return shutdownNotification;
} }
//</editor-fold>
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> taskResult = null;
//<editor-fold desc="RecordProcessorLifecycle">
@Override
public void started(Started started) {
if (taskResult != null) {
try {
taskResult.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
taskResult = executor.submit(() -> recordProcessorLifecycle.started(started));
}
@Override
public void recordsReceived(ProcessRecordsInput records) {
}
@Override
public void leaseLost(LeaseLost leaseLost) {
}
@Override
public void shardCompleted(ShardCompleted shardCompletedInput) {
}
@Override
public void shutdownRequested(ShutdownRequested shutdownRequested) {
}
//</editor-fold>
} }

View file

@ -0,0 +1,18 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
public class LeaseLost {
}

View file

@ -0,0 +1,22 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
import lombok.Data;
@Data
public class RecordsReceived {
}

View file

@ -0,0 +1,18 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
public class ShardCompleted {
}

View file

@ -0,0 +1,18 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
public class ShutdownRequested {
}

View file

@ -0,0 +1,33 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle.events;
import lombok.Data;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Data
public class Started {
private final String shardId;
private final ExtendedSequenceNumber sequenceNumber;
private final ExtendedSequenceNumber pendingSequenceNumber;
public InitializationInput toInitializationInput() {
return new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(sequenceNumber)
.withExtendedSequenceNumber(sequenceNumber);
}
}

View file

@ -1,26 +1,22 @@
/* /*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License
* * (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
* Licensed under the Amazon Software License (the "License"). * http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an "AS IS"
* You may not use this file except in compliance with the License. * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* A copy of the License is located at * language governing permissions and limitations under the License.
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/ */
package software.amazon.kinesis.lifecycle; package software.amazon.kinesis.lifecycle;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -29,7 +25,6 @@ import java.math.BigInteger;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -37,34 +32,47 @@ import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import org.hamcrest.Description;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import org.hamcrest.Matcher;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import org.hamcrest.TypeSafeDiagnosingMatcher;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.model.Record;
import com.google.protobuf.ByteString;
import lombok.Data;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
import software.amazon.kinesis.coordinator.StreamConfig;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.kpl.Messages; import software.amazon.kinesis.retrieval.kpl.Messages;
import software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord; import software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord;
import software.amazon.kinesis.retrieval.kpl.UserRecord; import software.amazon.kinesis.retrieval.kpl.UserRecord;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.google.protobuf.ByteString;
@RunWith(MockitoJUnitRunner.class)
public class ProcessTaskTest { public class ProcessTaskTest {
private StreamConfig config;
private ShardInfo shardInfo;
@Mock
private ProcessRecordsInput processRecordsInput;
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class RecordSubclass extends Record {} private static class RecordSubclass extends Record {
}
private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 }; private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 };
@ -75,78 +83,45 @@ public class ProcessTaskTest {
private final boolean callProcessRecordsForEmptyRecordList = true; private final boolean callProcessRecordsForEmptyRecordList = true;
// We don't want any of these tests to run checkpoint validation // We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false; private final boolean skipCheckpointValidationValue = false;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); .newInitialPosition(InitialPositionInStream.LATEST);
private @Mock @Mock
KinesisDataFetcher mockDataFetcher; private KinesisDataFetcher mockDataFetcher;
private @Mock IRecordProcessor mockRecordProcessor; @Mock
private @Mock private IRecordProcessor mockRecordProcessor;
RecordProcessorCheckpointer mockCheckpointer; @Mock
private RecordProcessorCheckpointer mockCheckpointer;
@Mock @Mock
private ThrottlingReporter throttlingReporter; private ThrottlingReporter throttlingReporter;
@Mock @Mock
private GetRecordsCache getRecordsCache; private GetRecordsCache getRecordsCache;
private List<Record> processedRecords;
private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
private ProcessTask processTask; private ProcessTask processTask;
@Before @Before
public void setUpProcessTask() { public void setUpProcessTask() {
// Initialize the annotation
MockitoAnnotations.initMocks(this);
// Set up process task // Set up process task
final StreamConfig config = config = new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList,
new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
skipCheckpointValidationValue, shardInfo = new ShardInfo(shardId, null, null, null);
INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
processTask = new ProcessTask(
shardInfo,
config,
mockRecordProcessor,
mockCheckpointer,
mockDataFetcher,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
throttlingReporter,
getRecordsCache);
} }
@Test private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput) {
public void testProcessTaskWithProvisionedThroughputExceededException() { return new ProcessTask(shardInfo, config, mockRecordProcessor, mockCheckpointer, taskBackoffTimeMillis,
// Set data fetcher to throw exception KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter,
doReturn(false).when(mockDataFetcher).isShardEndReached(); processRecordsInput);
doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(getRecordsCache)
.getNextResult();
TaskResult result = processTask.call();
verify(throttlingReporter).throttled();
verify(throttlingReporter, never()).success();
verify(getRecordsCache).getNextResult();
assertTrue("Result should contain ProvisionedThroughputExceededException",
result.getException() instanceof ProvisionedThroughputExceededException);
}
@Test
public void testProcessTaskWithNonExistentStream() {
// Data fetcher returns a null Result ` the stream does not exist
doReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest((long) 0)).when(getRecordsCache).getNextResult();
TaskResult result = processTask.call();
verify(getRecordsCache).getNextResult();
assertNull("Task should not throw an exception", result.getException());
} }
@Test @Test
public void testProcessTaskWithShardEndReached() { public void testProcessTaskWithShardEndReached() {
// Set data fetcher to return true for shard end reached
doReturn(true).when(mockDataFetcher).isShardEndReached(); processTask = makeProcessTask(processRecordsInput);
when(processRecordsInput.isAtShardEnd()).thenReturn(true);
TaskResult result = processTask.call(); TaskResult result = processTask.call();
assertTrue("Result should contain shardEndReached true", result.isShardEndReached()); assertThat(result, shardEndTaskResult(true));
} }
@Test @Test
@ -154,41 +129,42 @@ public class ProcessTaskTest {
final String sqn = new BigInteger(128, new Random()).toString(); final String sqn = new BigInteger(128, new Random()).toString();
final String pk = UUID.randomUUID().toString(); final String pk = UUID.randomUUID().toString();
final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS)); final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
final Record r = new Record() final Record r = new Record().withPartitionKey(pk).withData(ByteBuffer.wrap(TEST_DATA)).withSequenceNumber(sqn)
.withPartitionKey(pk)
.withData(ByteBuffer.wrap(TEST_DATA))
.withSequenceNumber(sqn)
.withApproximateArrivalTimestamp(ts); .withApproximateArrivalTimestamp(ts);
testWithRecord(r); RecordProcessorOutcome outcome = testWithRecord(r);
assertEquals(1, processedRecords.size()); assertEquals(1, outcome.getProcessRecordsCall().getRecords().size());
Record pr = processedRecords.get(0); Record pr = outcome.getProcessRecordsCall().getRecords().get(0);
assertEquals(pk, pr.getPartitionKey()); assertEquals(pk, pr.getPartitionKey());
assertEquals(ts, pr.getApproximateArrivalTimestamp()); assertEquals(ts, pr.getApproximateArrivalTimestamp());
byte[] b = new byte[pr.getData().remaining()]; byte[] b = pr.getData().array();
pr.getData().get(b); assertThat(b, equalTo(TEST_DATA));
assertTrue(Arrays.equals(TEST_DATA, b));
assertEquals(sqn, newLargestPermittedCheckpointValue.getSequenceNumber()); assertEquals(sqn, outcome.getCheckpointCall().getSequenceNumber());
assertEquals(0, newLargestPermittedCheckpointValue.getSubSequenceNumber()); assertEquals(0, outcome.getCheckpointCall().getSubSequenceNumber());
}
@Data
static class RecordProcessorOutcome {
final ProcessRecordsInput processRecordsCall;
final ExtendedSequenceNumber checkpointCall;
} }
@Test @Test
public void testDoesNotDeaggregateSubclassOfRecord() { public void testDoesNotDeaggregateSubclassOfRecord() {
final String sqn = new BigInteger(128, new Random()).toString(); final String sqn = new BigInteger(128, new Random()).toString();
final Record r = new RecordSubclass() final Record r = new RecordSubclass().withSequenceNumber(sqn).withData(ByteBuffer.wrap(new byte[0]));
.withSequenceNumber(sqn)
.withData(ByteBuffer.wrap(new byte[0]));
testWithRecord(r); processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecord(r);
assertEquals(1, processedRecords.size(), 1); assertEquals(1, outcome.getProcessRecordsCall().getRecords().size(), 1);
assertSame(r, processedRecords.get(0)); assertSame(r, outcome.getProcessRecordsCall().getRecords().get(0));
assertEquals(sqn, newLargestPermittedCheckpointValue.getSequenceNumber()); assertEquals(sqn, outcome.getCheckpointCall().getSequenceNumber());
assertEquals(0, newLargestPermittedCheckpointValue.getSubSequenceNumber()); assertEquals(0, outcome.getCheckpointCall().getSubSequenceNumber());
} }
@Test @Test
@ -196,44 +172,44 @@ public class ProcessTaskTest {
final String sqn = new BigInteger(128, new Random()).toString(); final String sqn = new BigInteger(128, new Random()).toString();
final String pk = UUID.randomUUID().toString(); final String pk = UUID.randomUUID().toString();
final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS)); final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
final Record r = new Record() final Record r = new Record().withPartitionKey("-").withData(generateAggregatedRecord(pk))
.withPartitionKey("-") .withSequenceNumber(sqn).withApproximateArrivalTimestamp(ts);
.withData(generateAggregatedRecord(pk))
.withSequenceNumber(sqn)
.withApproximateArrivalTimestamp(ts);
testWithRecord(r); processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecord(r);
assertEquals(3, processedRecords.size()); List<Record> actualRecords = outcome.getProcessRecordsCall().getRecords();
for (Record pr : processedRecords) {
assertTrue(pr instanceof UserRecord); assertEquals(3, actualRecords.size());
for (Record pr : actualRecords) {
assertThat(pr, instanceOf(UserRecord.class));
assertEquals(pk, pr.getPartitionKey()); assertEquals(pk, pr.getPartitionKey());
assertEquals(ts, pr.getApproximateArrivalTimestamp()); assertEquals(ts, pr.getApproximateArrivalTimestamp());
byte[] b = new byte[pr.getData().remaining()]; byte[] b = pr.getData().array();
pr.getData().get(b); assertThat(b, equalTo(TEST_DATA));
assertTrue(Arrays.equals(TEST_DATA, b));
} }
assertEquals(sqn, newLargestPermittedCheckpointValue.getSequenceNumber()); assertEquals(sqn, outcome.getCheckpointCall().getSequenceNumber());
assertEquals(processedRecords.size() - 1, newLargestPermittedCheckpointValue.getSubSequenceNumber()); assertEquals(actualRecords.size() - 1, outcome.getCheckpointCall().getSubSequenceNumber());
} }
@Test @Test
public void testDeaggregatesRecordWithNoArrivalTimestamp() { public void testDeaggregatesRecordWithNoArrivalTimestamp() {
final String sqn = new BigInteger(128, new Random()).toString(); final String sqn = new BigInteger(128, new Random()).toString();
final String pk = UUID.randomUUID().toString(); final String pk = UUID.randomUUID().toString();
final Record r = new Record() final Record r = new Record().withPartitionKey("-").withData(generateAggregatedRecord(pk))
.withPartitionKey("-")
.withData(generateAggregatedRecord(pk))
.withSequenceNumber(sqn); .withSequenceNumber(sqn);
testWithRecord(r); processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecord(r);
assertEquals(3, processedRecords.size()); List<Record> actualRecords = outcome.getProcessRecordsCall().getRecords();
for (Record pr : processedRecords) {
assertTrue(pr instanceof UserRecord); assertEquals(3, actualRecords.size());
for (Record pr : actualRecords) {
assertThat(pr, instanceOf(UserRecord.class));
assertEquals(pk, pr.getPartitionKey()); assertEquals(pk, pr.getPartitionKey());
assertNull(pr.getApproximateArrivalTimestamp()); assertThat(pr.getApproximateArrivalTimestamp(), nullValue());
} }
} }
@ -246,15 +222,17 @@ public class ProcessTaskTest {
final int numberOfRecords = 104; final int numberOfRecords = 104;
// Start these batch of records's sequence number that is greater than previous checkpoint value. // Start these batch of records's sequence number that is greater than previous checkpoint value.
final BigInteger startingSqn = previousCheckpointSqn.add(BigInteger.valueOf(10)); final BigInteger startingSqn = previousCheckpointSqn.add(BigInteger.valueOf(10));
final List<Record> records = generateConsecutiveRecords( final List<Record> records = generateConsecutiveRecords(numberOfRecords, "-", ByteBuffer.wrap(TEST_DATA),
numberOfRecords, "-", ByteBuffer.wrap(TEST_DATA), new Date(), startingSqn); new Date(), startingSqn);
testWithRecords(records, new ExtendedSequenceNumber(previousCheckpointSqn.toString()), processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecords(records,
new ExtendedSequenceNumber(previousCheckpointSqn.toString()),
new ExtendedSequenceNumber(previousCheckpointSqn.toString())); new ExtendedSequenceNumber(previousCheckpointSqn.toString()));
final ExtendedSequenceNumber expectedLargestPermittedEsqn = new ExtendedSequenceNumber( final ExtendedSequenceNumber expectedLargestPermittedEsqn = new ExtendedSequenceNumber(
startingSqn.add(BigInteger.valueOf(numberOfRecords - 1)).toString()); startingSqn.add(BigInteger.valueOf(numberOfRecords - 1)).toString());
assertEquals(expectedLargestPermittedEsqn, newLargestPermittedCheckpointValue); assertEquals(expectedLargestPermittedEsqn, outcome.getCheckpointCall());
} }
@Test @Test
@ -265,10 +243,12 @@ public class ProcessTaskTest {
final ExtendedSequenceNumber largestPermittedEsqn = new ExtendedSequenceNumber( final ExtendedSequenceNumber largestPermittedEsqn = new ExtendedSequenceNumber(
baseSqn.add(BigInteger.valueOf(100)).toString()); baseSqn.add(BigInteger.valueOf(100)).toString());
testWithRecords(Collections.<Record>emptyList(), lastCheckpointEspn, largestPermittedEsqn); processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecords(Collections.emptyList(), lastCheckpointEspn,
largestPermittedEsqn);
// Make sure that even with empty records, largest permitted sequence number does not change. // Make sure that even with empty records, largest permitted sequence number does not change.
assertEquals(largestPermittedEsqn, newLargestPermittedCheckpointValue); assertEquals(largestPermittedEsqn, outcome.getCheckpointCall());
} }
@Test @Test
@ -286,21 +266,22 @@ public class ProcessTaskTest {
// Values for this processRecords call. // Values for this processRecords call.
final String startingSqn = previousCheckpointSqn.toString(); final String startingSqn = previousCheckpointSqn.toString();
final String pk = UUID.randomUUID().toString(); final String pk = UUID.randomUUID().toString();
final Record r = new Record() final Record r = new Record().withPartitionKey("-").withData(generateAggregatedRecord(pk))
.withPartitionKey("-")
.withData(generateAggregatedRecord(pk))
.withSequenceNumber(startingSqn); .withSequenceNumber(startingSqn);
testWithRecords(Collections.singletonList(r), processTask = makeProcessTask(processRecordsInput);
RecordProcessorOutcome outcome = testWithRecords(Collections.singletonList(r),
new ExtendedSequenceNumber(previousCheckpointSqn.toString(), previousCheckpointSsqn), new ExtendedSequenceNumber(previousCheckpointSqn.toString(), previousCheckpointSsqn),
new ExtendedSequenceNumber(previousCheckpointSqn.toString(), previousCheckpointSsqn)); new ExtendedSequenceNumber(previousCheckpointSqn.toString(), previousCheckpointSsqn));
List<Record> actualRecords = outcome.getProcessRecordsCall().getRecords();
// First two records should be dropped - and only 1 remaining records should be there. // First two records should be dropped - and only 1 remaining records should be there.
assertEquals(1, processedRecords.size()); assertEquals(1, actualRecords.size());
assertTrue(processedRecords.get(0) instanceof UserRecord); assertThat(actualRecords.get(0), instanceOf(UserRecord.class));
// Verify user record's extended sequence number and other fields. // Verify user record's extended sequence number and other fields.
final UserRecord pr = (UserRecord)processedRecords.get(0); final UserRecord pr = (UserRecord) actualRecords.get(0);
assertEquals(pk, pr.getPartitionKey()); assertEquals(pk, pr.getPartitionKey());
assertEquals(startingSqn, pr.getSequenceNumber()); assertEquals(startingSqn, pr.getSequenceNumber());
assertEquals(previousCheckpointSsqn + 1, pr.getSubSequenceNumber()); assertEquals(previousCheckpointSsqn + 1, pr.getSubSequenceNumber());
@ -309,60 +290,50 @@ public class ProcessTaskTest {
// Expected largest permitted sequence number will be last sub-record sequence number. // Expected largest permitted sequence number will be last sub-record sequence number.
final ExtendedSequenceNumber expectedLargestPermittedEsqn = new ExtendedSequenceNumber( final ExtendedSequenceNumber expectedLargestPermittedEsqn = new ExtendedSequenceNumber(
previousCheckpointSqn.toString(), 2L); previousCheckpointSqn.toString(), 2L);
assertEquals(expectedLargestPermittedEsqn, newLargestPermittedCheckpointValue); assertEquals(expectedLargestPermittedEsqn, outcome.getCheckpointCall());
} }
private void testWithRecord(Record record) { private RecordProcessorOutcome testWithRecord(Record record) {
testWithRecords(Collections.singletonList(record), return testWithRecords(Collections.singletonList(record), ExtendedSequenceNumber.TRIM_HORIZON,
ExtendedSequenceNumber.TRIM_HORIZON, ExtendedSequenceNumber.TRIM_HORIZON); ExtendedSequenceNumber.TRIM_HORIZON);
} }
private void testWithRecords(List<Record> records, private RecordProcessorOutcome testWithRecords(List<Record> records, ExtendedSequenceNumber lastCheckpointValue,
ExtendedSequenceNumber lastCheckpointValue,
ExtendedSequenceNumber largestPermittedCheckpointValue) { ExtendedSequenceNumber largestPermittedCheckpointValue) {
when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(records).withMillisBehindLatest((long) 1000 * 50));
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
when(processRecordsInput.getRecords()).thenReturn(records);
processTask = makeProcessTask(processRecordsInput);
processTask.call(); processTask.call();
verify(throttlingReporter).success(); verify(throttlingReporter).success();
verify(throttlingReporter, never()).throttled(); verify(throttlingReporter, never()).throttled();
verify(getRecordsCache).getNextResult(); ArgumentCaptor<ProcessRecordsInput> recordsCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(recordsCaptor.capture());
verify(mockRecordProcessor).processRecords(priCaptor.capture());
processedRecords = priCaptor.getValue().getRecords();
ArgumentCaptor<ExtendedSequenceNumber> esnCaptor = ArgumentCaptor.forClass(ExtendedSequenceNumber.class); ArgumentCaptor<ExtendedSequenceNumber> esnCaptor = ArgumentCaptor.forClass(ExtendedSequenceNumber.class);
verify(mockCheckpointer).setLargestPermittedCheckpointValue(esnCaptor.capture()); verify(mockCheckpointer).setLargestPermittedCheckpointValue(esnCaptor.capture());
newLargestPermittedCheckpointValue = esnCaptor.getValue();
return new RecordProcessorOutcome(recordsCaptor.getValue(), esnCaptor.getValue());
} }
/** /**
* See the KPL documentation on GitHub for more details about the binary * See the KPL documentation on GitHub for more details about the binary format.
* format.
* *
* @param pk * @param pk
* Partition key to use. All the records will have the same * Partition key to use. All the records will have the same partition key.
* partition key. * @return ByteBuffer containing the serialized form of the aggregated record, along with the necessary header and
* @return ByteBuffer containing the serialized form of the aggregated * footer.
* record, along with the necessary header and footer.
*/ */
private static ByteBuffer generateAggregatedRecord(String pk) { private static ByteBuffer generateAggregatedRecord(String pk) {
ByteBuffer bb = ByteBuffer.allocate(1024); ByteBuffer bb = ByteBuffer.allocate(1024);
bb.put(new byte[] { -13, -119, -102, -62 }); bb.put(new byte[] { -13, -119, -102, -62 });
Messages.Record r = Messages.Record r = Messages.Record.newBuilder().setData(ByteString.copyFrom(TEST_DATA)).setPartitionKeyIndex(0)
Messages.Record.newBuilder()
.setData(ByteString.copyFrom(TEST_DATA))
.setPartitionKeyIndex(0)
.build(); .build();
byte[] payload = AggregatedRecord.newBuilder() byte[] payload = AggregatedRecord.newBuilder().addPartitionKeyTable(pk).addRecords(r).addRecords(r)
.addPartitionKeyTable(pk) .addRecords(r).build().toByteArray();
.addRecords(r)
.addRecords(r)
.addRecords(r)
.build()
.toByteArray();
bb.put(payload); bb.put(payload);
bb.put(md5(payload)); bb.put(md5(payload));
@ -371,14 +342,11 @@ public class ProcessTaskTest {
return bb; return bb;
} }
private static List<Record> generateConsecutiveRecords( private static List<Record> generateConsecutiveRecords(int numberOfRecords, String partitionKey, ByteBuffer data,
int numberOfRecords, String partitionKey, ByteBuffer data,
Date arrivalTimestamp, BigInteger startSequenceNumber) { Date arrivalTimestamp, BigInteger startSequenceNumber) {
List<Record> records = new ArrayList<>(); List<Record> records = new ArrayList<>();
for (int i = 0; i < numberOfRecords; ++i) { for (int i = 0; i < numberOfRecords; ++i) {
records.add(new Record() records.add(new Record().withPartitionKey(partitionKey).withData(data)
.withPartitionKey(partitionKey)
.withData(data)
.withSequenceNumber(startSequenceNumber.add(BigInteger.valueOf(i)).toString()) .withSequenceNumber(startSequenceNumber.add(BigInteger.valueOf(i)).toString())
.withApproximateArrivalTimestamp(arrivalTimestamp)); .withApproximateArrivalTimestamp(arrivalTimestamp));
} }
@ -393,4 +361,48 @@ public class ProcessTaskTest {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private static TaskResultMatcher shardEndTaskResult(boolean isAtShardEnd) {
TaskResult expected = new TaskResult(null, isAtShardEnd);
return taskResult(expected);
}
private static TaskResultMatcher exceptionTaskResult(Exception ex) {
TaskResult expected = new TaskResult(ex, false);
return taskResult(expected);
}
private static TaskResultMatcher taskResult(TaskResult expected) {
return new TaskResultMatcher(expected);
}
private static class TaskResultMatcher extends TypeSafeDiagnosingMatcher<TaskResult> {
Matcher<TaskResult> matchers;
TaskResultMatcher(TaskResult expected) {
if (expected == null) {
matchers = nullValue(TaskResult.class);
} else {
matchers = allOf(notNullValue(TaskResult.class),
hasProperty("shardEndReached", equalTo(expected.isShardEndReached())),
hasProperty("exception", equalTo(expected.getException())));
}
}
@Override
protected boolean matchesSafely(TaskResult item, Description mismatchDescription) {
if (!matchers.matches(item)) {
matchers.describeMismatch(item, mismatchDescription);
return false;
}
return true;
}
@Override
public void describeTo(Description description) {
description.appendDescriptionOf(matchers);
}
}
} }