From 7b026f8a193b37123784e555a2096890aa5c45b6 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Mon, 26 Mar 2018 11:47:44 -0700 Subject: [PATCH 1/3] Broke apart the process task, now diving into the shard consumer more. --- .../kinesis/lifecycle/ConsumerStates.java | 5 +- .../lifecycle/ProcessRecordsInput.java | 4 + .../amazon/kinesis/lifecycle/ProcessTask.java | 213 ++++------ .../lifecycle/RecordProcessorLifecycle.java | 32 ++ .../lifecycle/RecordProcessorShim.java | 54 +++ .../kinesis/lifecycle/ShardConsumer.java | 167 +++++--- .../kinesis/lifecycle/events/LeaseLost.java | 18 + .../lifecycle/events/RecordsReceived.java | 22 ++ .../lifecycle/events/ShardCompleted.java | 18 + .../lifecycle/events/ShutdownRequested.java | 18 + .../kinesis/lifecycle/events/Started.java | 33 ++ .../kinesis/lifecycle/ProcessTaskTest.java | 366 +++++++++--------- 12 files changed, 580 insertions(+), 370 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/LeaseLost.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/Started.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index e192a505..ab941938 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -312,14 +312,13 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { + ProcessTask.RecordsFetcher recordsFetcher = new ProcessTask.RecordsFetcher(consumer.getGetRecordsCache()); return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), - consumer.getDataFetcher(), consumer.getTaskBackoffTimeMillis(), - consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), - consumer.getGetRecordsCache()); + consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), recordsFetcher.getRecords()); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessRecordsInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessRecordsInput.java index 5bb47cd1..96008359 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessRecordsInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessRecordsInput.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.time.Instant; import java.util.List; +import lombok.AllArgsConstructor; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.model.Record; @@ -29,11 +30,14 @@ import software.amazon.kinesis.processor.IRecordProcessor; * {@link IRecordProcessor#processRecords( * ProcessRecordsInput processRecordsInput) processRecords} method. */ +@AllArgsConstructor public class ProcessRecordsInput { @Getter private Instant cacheEntryTime; @Getter private Instant cacheExitTime; + @Getter + private boolean isAtShardEnd; private List records; private IRecordProcessorCheckpointer checkpointer; private Long millisBehindLatest; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 5076dc6f..04f56fcc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -1,16 +1,9 @@ /* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License + * (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at + * http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific + * language governing permissions and limitations under the License. */ package software.amazon.kinesis.lifecycle; @@ -19,26 +12,24 @@ import java.util.List; import java.util.ListIterator; import com.amazonaws.services.cloudwatch.model.StandardUnit; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; -import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.coordinator.StreamConfig; -import software.amazon.kinesis.retrieval.ThrottlingReporter; +import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.metrics.IMetricsScope; +import software.amazon.kinesis.metrics.MetricsHelper; +import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.IKinesisProxyExtended; -import software.amazon.kinesis.retrieval.KinesisDataFetcher; +import software.amazon.kinesis.retrieval.ThrottlingReporter; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.UserRecord; -import software.amazon.kinesis.metrics.MetricsHelper; -import software.amazon.kinesis.metrics.IMetricsScope; -import software.amazon.kinesis.metrics.MetricsLevel; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.Shard; - -import lombok.extern.slf4j.Slf4j; /** * Task for fetching data records and invoking processRecords() on the record processor instance. @@ -55,39 +46,30 @@ public class ProcessTask implements ITask { private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; private final RecordProcessorCheckpointer recordProcessorCheckpointer; - private final KinesisDataFetcher dataFetcher; private final TaskType taskType = TaskType.PROCESS; private final StreamConfig streamConfig; private final long backoffTimeMillis; private final Shard shard; private final ThrottlingReporter throttlingReporter; - private final GetRecordsCache getRecordsCache; + private final ProcessRecordsInput processRecordsInput; + + @RequiredArgsConstructor + public static class RecordsFetcher { + + private final GetRecordsCache getRecordsCache; + + public ProcessRecordsInput getRecords() { + ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); + + if (processRecordsInput.getMillisBehindLatest() != null) { + MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, + processRecordsInput.getMillisBehindLatest(), StandardUnit.Milliseconds, MetricsLevel.SUMMARY); + } + + return processRecordsInput; + } - /** - * @param shardInfo - * contains information about the shard - * @param streamConfig - * Stream configuration - * @param recordProcessor - * Record processor used to process the data records for the shard - * @param recordProcessorCheckpointer - * Passed to the RecordProcessor so it can checkpoint progress - * @param dataFetcher - * Kinesis data fetcher (used to fetch records from Kinesis) - * @param backoffTimeMillis - * backoff time when catching exceptions - * @param getRecordsCache - * The retrieval strategy for fetching records from kinesis - */ - public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - GetRecordsCache getRecordsCache) { - this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, - new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), - getRecordsCache); } /** @@ -99,27 +81,44 @@ public class ProcessTask implements ITask { * Record processor used to process the data records for the shard * @param recordProcessorCheckpointer * Passed to the RecordProcessor so it can checkpoint progress - * @param dataFetcher - * Kinesis data fetcher (used to fetch records from Kinesis) + * @param backoffTimeMillis + * backoff time when catching exceptions + */ + public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ProcessRecordsInput processRecordsInput) { + this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), processRecordsInput); + } + + /** + * @param shardInfo + * contains information about the shard + * @param streamConfig + * Stream configuration + * @param recordProcessor + * Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer + * Passed to the RecordProcessor so it can checkpoint progress * @param backoffTimeMillis * backoff time when catching exceptions * @param throttlingReporter * determines how throttling events should be reported in the log. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) { + RecordProcessorCheckpointer recordProcessorCheckpointer, long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ThrottlingReporter throttlingReporter, + ProcessRecordsInput processRecordsInput) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; - this.dataFetcher = dataFetcher; this.streamConfig = streamConfig; this.backoffTimeMillis = backoffTimeMillis; this.throttlingReporter = throttlingReporter; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - this.getRecordsCache = getRecordsCache; + this.processRecordsInput = processRecordsInput; // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if @@ -138,7 +137,6 @@ public class ProcessTask implements ITask { /* * (non-Javadoc) - * * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call() */ @Override @@ -151,12 +149,11 @@ public class ProcessTask implements ITask { Exception exception = null; try { - if (dataFetcher.isShardEndReached()) { + if (processRecordsInput.isAtShardEnd()) { log.info("Reached end of shard {}", shardInfo.getShardId()); return new TaskResult(null, true); } - final ProcessRecordsInput processRecordsInput = getRecordsResult(); throttlingReporter.success(); List records = processRecordsInput.getRecords(); @@ -167,19 +164,13 @@ public class ProcessTask implements ITask { } records = deaggregateRecords(records); - recordProcessorCheckpointer.setLargestPermittedCheckpointValue( - filterAndGetMaxExtendedSequenceNumber(scope, records, - recordProcessorCheckpointer.getLastCheckpointValue(), - recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); + recordProcessorCheckpointer.setLargestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(scope, + records, recordProcessorCheckpointer.getLastCheckpointValue(), + recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { callProcessRecords(processRecordsInput, records); } - } catch (ProvisionedThroughputExceededException pte) { - throttlingReporter.throttled(); - exception = pte; - backoff(); - } catch (RuntimeException e) { log.error("ShardId {}: Caught exception: ", shardInfo.getShardId(), e); exception = e; @@ -213,8 +204,7 @@ public class ProcessTask implements ITask { log.debug("Calling application processRecords() with {} records from {}", records.size(), shardInfo.getShardId()); final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) - .withCheckpointer(recordProcessorCheckpointer) - .withMillisBehindLatest(input.getMillisBehindLatest()); + .withCheckpointer(recordProcessorCheckpointer).withMillisBehindLatest(input.getMillisBehindLatest()); final long recordProcessorStartTimeMillis = System.currentTimeMillis(); try { @@ -292,27 +282,28 @@ public class ProcessTask implements ITask { } /** - * Scans a list of records to filter out records up to and including the most recent checkpoint value and to get - * the greatest extended sequence number from the retained records. Also emits metrics about the records. + * Scans a list of records to filter out records up to and including the most recent checkpoint value and to get the + * greatest extended sequence number from the retained records. Also emits metrics about the records. * - * @param scope metrics scope to emit metrics into - * @param records list of records to scan and change in-place as needed - * @param lastCheckpointValue the most recent checkpoint value - * @param lastLargestPermittedCheckpointValue previous largest permitted checkpoint value + * @param scope + * metrics scope to emit metrics into + * @param records + * list of records to scan and change in-place as needed + * @param lastCheckpointValue + * the most recent checkpoint value + * @param lastLargestPermittedCheckpointValue + * previous largest permitted checkpoint value * @return the largest extended sequence number among the retained records */ private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List records, - final ExtendedSequenceNumber lastCheckpointValue, - final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { + final ExtendedSequenceNumber lastCheckpointValue, + final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue; ListIterator recordIterator = records.listIterator(); while (recordIterator.hasNext()) { Record record = recordIterator.next(); - ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber( - record.getSequenceNumber(), - record instanceof UserRecord - ? ((UserRecord) record).getSubSequenceNumber() - : null); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(record.getSequenceNumber(), + record instanceof UserRecord ? ((UserRecord) record).getSubSequenceNumber() : null); if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) { recordIterator.remove(); @@ -332,58 +323,4 @@ public class ProcessTask implements ITask { return largestExtendedSequenceNumber; } - /** - * Gets records from Kinesis and retries once in the event of an ExpiredIteratorException. - * - * @return list of data records from Kinesis - */ - private ProcessRecordsInput getRecordsResult() { - try { - return getRecordsResultAndRecordMillisBehindLatest(); - } catch (ExpiredIteratorException e) { - // If we see a ExpiredIteratorException, try once to restart from the greatest remembered sequence number - log.info("ShardId {}" - + ": getRecords threw ExpiredIteratorException - restarting after greatest seqNum " - + "passed to customer", shardInfo.getShardId(), e); - MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count, - MetricsLevel.SUMMARY); - - /* - * Advance the iterator to after the greatest processed sequence number (remembered by - * recordProcessorCheckpointer). - */ - dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue() - .getSequenceNumber(), streamConfig.getInitialPositionInStream()); - - // Try a second time - if we fail this time, expose the failure. - try { - return getRecordsResultAndRecordMillisBehindLatest(); - } catch (ExpiredIteratorException ex) { - String msg = - "Shard " + shardInfo.getShardId() - + ": getRecords threw ExpiredIteratorException with a fresh iterator."; - log.error(msg, ex); - throw ex; - } - } - } - - /** - * Gets records from Kinesis and records the MillisBehindLatest metric if present. - * - * @return list of data records from Kinesis - */ - private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() { - final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); - - if (processRecordsInput.getMillisBehindLatest() != null) { - MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, - processRecordsInput.getMillisBehindLatest(), - StandardUnit.Milliseconds, - MetricsLevel.SUMMARY); - } - - return processRecordsInput; - } - } \ No newline at end of file diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java new file mode 100644 index 00000000..db63f88b --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java @@ -0,0 +1,32 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +import software.amazon.kinesis.lifecycle.events.LeaseLost; +import software.amazon.kinesis.lifecycle.events.RecordsReceived; +import software.amazon.kinesis.lifecycle.events.ShardCompleted; +import software.amazon.kinesis.lifecycle.events.ShutdownRequested; +import software.amazon.kinesis.lifecycle.events.Started; + +public interface RecordProcessorLifecycle { + + void started(Started started); + void recordsReceived(RecordsReceived records); + void leaseLost(LeaseLost leaseLost); + void shardCompleted(ShardCompleted shardCompletedInput); + void shutdownRequested(ShutdownRequested shutdownRequested); + + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java new file mode 100644 index 00000000..7d906991 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java @@ -0,0 +1,54 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +import lombok.AllArgsConstructor; +import software.amazon.kinesis.lifecycle.events.LeaseLost; +import software.amazon.kinesis.lifecycle.events.ShardCompleted; +import software.amazon.kinesis.lifecycle.events.ShutdownRequested; +import software.amazon.kinesis.lifecycle.events.Started; +import software.amazon.kinesis.processor.IRecordProcessor; + +@AllArgsConstructor +public class RecordProcessorShim implements RecordProcessorLifecycle { + + private final IRecordProcessor delegate; + + @Override + public void started(Started started) { + InitializationInput initializationInput = started.toInitializationInput(); + delegate.initialize(initializationInput); + } + + @Override + public void recordsReceived(ProcessRecordsInput records) { + + } + + @Override + public void leaseLost(LeaseLost leaseLost) { + + } + + @Override + public void shardCompleted(ShardCompleted shardCompletedInput) { + + } + + @Override + public void shutdownRequested(ShutdownRequested shutdownRequested) { + + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index d5e30b76..6fcd2a82 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -16,13 +16,20 @@ package software.amazon.kinesis.lifecycle; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.checkpoint.Checkpoint; +import software.amazon.kinesis.lifecycle.events.LeaseLost; +import software.amazon.kinesis.lifecycle.events.ShardCompleted; +import software.amazon.kinesis.lifecycle.events.ShutdownRequested; +import software.amazon.kinesis.lifecycle.events.Started; import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; import software.amazon.kinesis.leases.ShardInfo; @@ -48,9 +55,11 @@ import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy; * A new instance should be created if the primary responsibility is reassigned back to this process. */ @Slf4j -public class ShardConsumer { +public class ShardConsumer implements RecordProcessorLifecycle { + // private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; + private RecordProcessorLifecycle recordProcessorLifecycle; private final KinesisClientLibConfiguration config; private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final ExecutorService executorService; @@ -68,7 +77,9 @@ public class ShardConsumer { private ITask currentTask; private long currentTaskSubmitTime; private Future future; - + // + + // @Getter private final GetRecordsCache getRecordsCache; @@ -82,6 +93,7 @@ public class ShardConsumer { return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); } + // /* * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do @@ -95,6 +107,7 @@ public class ShardConsumer { private volatile ShutdownReason shutdownReason; private volatile ShutdownNotification shutdownNotification; + // /** * @param shardInfo Shard information * @param streamConfig Stream configuration to use @@ -245,7 +258,9 @@ public class ShardConsumer { makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords()); } + // + // /** * No-op if current task is pending, otherwise submits next task for this shard. * This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state. @@ -345,57 +360,9 @@ public class ShardConsumer { } } - /** - * Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint - * before being shutdown. - * - * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. - */ - public void notifyShutdownRequested(ShutdownNotification shutdownNotification) { - this.shutdownNotification = shutdownNotification; - markForShutdown(ShutdownReason.REQUESTED); - } - - /** - * Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API). - * This is called by Worker when it loses responsibility for a shard. - * - * @return true if shutdown is complete (false if shutdown is still in progress) - */ - public synchronized boolean beginShutdown() { - markForShutdown(ShutdownReason.ZOMBIE); - checkAndSubmitNextTask(); - - return isShutdown(); - } - - synchronized void markForShutdown(ShutdownReason reason) { - // ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard) - if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { - shutdownReason = reason; - } - } - - /** - * Used (by Worker) to check if this ShardConsumer instance has been shutdown - * RecordProcessor shutdown() has been invoked, as appropriate. - * - * @return true if shutdown is complete - */ - public boolean isShutdown() { - return currentState.isTerminal(); - } - - /** - * @return the shutdownReason - */ - public ShutdownReason getShutdownReason() { - return shutdownReason; - } - /** * Figure out next task to run based on current state, task, and shutdown context. - * + * * @return Return next task to run */ private ITask getNextTask() { @@ -411,7 +378,7 @@ public class ShardConsumer { /** * Note: This is a private/internal method with package level access solely for testing purposes. * Update state based on information about: task success, current state, and shutdown info. - * + * * @param taskOutcome The outcome of the last task */ void updateState(TaskOutcome taskOutcome) { @@ -435,11 +402,65 @@ public class ShardConsumer { } + // + + // + /** + * Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint + * before being shutdown. + * + * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. + */ + public void notifyShutdownRequested(ShutdownNotification shutdownNotification) { + this.shutdownNotification = shutdownNotification; + markForShutdown(ShutdownReason.REQUESTED); + } + + /** + * Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API). + * This is called by Worker when it loses responsibility for a shard. + * + * @return true if shutdown is complete (false if shutdown is still in progress) + */ + public synchronized boolean beginShutdown() { + markForShutdown(ShutdownReason.ZOMBIE); + checkAndSubmitNextTask(); + + return isShutdown(); + } + + synchronized void markForShutdown(ShutdownReason reason) { + // ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard) + if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { + shutdownReason = reason; + } + } + + /** + * Used (by Worker) to check if this ShardConsumer instance has been shutdown + * RecordProcessor shutdown() has been invoked, as appropriate. + * + * @return true if shutdown is complete + */ + public boolean isShutdown() { + return currentState.isTerminal(); + } + + /** + * @return the shutdownReason + */ + public ShutdownReason getShutdownReason() { + return shutdownReason; + } + @VisibleForTesting public boolean isShutdownRequested() { return shutdownReason != null; } + // + + // /** * Private/Internal method - has package level access solely for testing purposes. * @@ -504,4 +525,46 @@ public class ShardConsumer { ShutdownNotification getShutdownNotification() { return shutdownNotification; } + // + + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future taskResult = null; + + // + @Override + public void started(Started started) { + if (taskResult != null) { + try { + taskResult.get(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + + taskResult = executor.submit(() -> recordProcessorLifecycle.started(started)); + } + + @Override + public void recordsReceived(ProcessRecordsInput records) { + + } + + @Override + public void leaseLost(LeaseLost leaseLost) { + + } + + @Override + public void shardCompleted(ShardCompleted shardCompletedInput) { + + } + + @Override + public void shutdownRequested(ShutdownRequested shutdownRequested) { + + } + // } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/LeaseLost.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/LeaseLost.java new file mode 100644 index 00000000..912f2966 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/LeaseLost.java @@ -0,0 +1,18 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle.events; + +public class LeaseLost { +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java new file mode 100644 index 00000000..15dc0cc6 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle.events; + +import lombok.Data; + +@Data +public class RecordsReceived { + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java new file mode 100644 index 00000000..1df45a56 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java @@ -0,0 +1,18 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle.events; + +public class ShardCompleted { +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java new file mode 100644 index 00000000..aa9074bd --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java @@ -0,0 +1,18 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle.events; + +public class ShutdownRequested { +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/Started.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/Started.java new file mode 100644 index 00000000..80943ad4 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/Started.java @@ -0,0 +1,33 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle.events; + +import lombok.Data; +import software.amazon.kinesis.lifecycle.InitializationInput; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +@Data +public class Started { + + private final String shardId; + private final ExtendedSequenceNumber sequenceNumber; + private final ExtendedSequenceNumber pendingSequenceNumber; + + public InitializationInput toInitializationInput() { + return new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(sequenceNumber) + .withExtendedSequenceNumber(sequenceNumber); + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index 4a97d347..4d6233a8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -1,26 +1,22 @@ /* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License + * (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at + * http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific + * language governing permissions and limitations under the License. */ package software.amazon.kinesis.lifecycle; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -29,7 +25,6 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; @@ -37,34 +32,47 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; -import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; -import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; -import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.coordinator.StreamConfig; -import software.amazon.kinesis.retrieval.ThrottlingReporter; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; +import org.mockito.runners.MockitoJUnitRunner; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; +import com.amazonaws.services.kinesis.model.Record; +import com.google.protobuf.ByteString; + +import lombok.Data; +import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; +import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; +import software.amazon.kinesis.coordinator.StreamConfig; +import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.KinesisDataFetcher; +import software.amazon.kinesis.retrieval.ThrottlingReporter; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.Messages; import software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord; import software.amazon.kinesis.retrieval.kpl.UserRecord; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.Record; -import com.google.protobuf.ByteString; +@RunWith(MockitoJUnitRunner.class) public class ProcessTaskTest { + private StreamConfig config; + private ShardInfo shardInfo; + + @Mock + private ProcessRecordsInput processRecordsInput; + @SuppressWarnings("serial") - private static class RecordSubclass extends Record {} + private static class RecordSubclass extends Record { + } private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 }; @@ -75,78 +83,45 @@ public class ProcessTaskTest { private final boolean callProcessRecordsForEmptyRecordList = true; // We don't want any of these tests to run checkpoint validation private final boolean skipCheckpointValidationValue = false; - private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); + private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended + .newInitialPosition(InitialPositionInStream.LATEST); - private @Mock - KinesisDataFetcher mockDataFetcher; - private @Mock IRecordProcessor mockRecordProcessor; - private @Mock - RecordProcessorCheckpointer mockCheckpointer; + @Mock + private KinesisDataFetcher mockDataFetcher; + @Mock + private IRecordProcessor mockRecordProcessor; + @Mock + private RecordProcessorCheckpointer mockCheckpointer; @Mock private ThrottlingReporter throttlingReporter; @Mock private GetRecordsCache getRecordsCache; - private List processedRecords; - private ExtendedSequenceNumber newLargestPermittedCheckpointValue; - private ProcessTask processTask; @Before public void setUpProcessTask() { - // Initialize the annotation - MockitoAnnotations.initMocks(this); // Set up process task - final StreamConfig config = - new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, - INITIAL_POSITION_LATEST); - final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); - processTask = new ProcessTask( - shardInfo, - config, - mockRecordProcessor, - mockCheckpointer, - mockDataFetcher, - taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, - throttlingReporter, - getRecordsCache); + config = new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + shardInfo = new ShardInfo(shardId, null, null, null); + } - @Test - public void testProcessTaskWithProvisionedThroughputExceededException() { - // Set data fetcher to throw exception - doReturn(false).when(mockDataFetcher).isShardEndReached(); - doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(getRecordsCache) - .getNextResult(); - - TaskResult result = processTask.call(); - verify(throttlingReporter).throttled(); - verify(throttlingReporter, never()).success(); - verify(getRecordsCache).getNextResult(); - assertTrue("Result should contain ProvisionedThroughputExceededException", - result.getException() instanceof ProvisionedThroughputExceededException); - } - - @Test - public void testProcessTaskWithNonExistentStream() { - // Data fetcher returns a null Result ` the stream does not exist - doReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest((long) 0)).when(getRecordsCache).getNextResult(); - - TaskResult result = processTask.call(); - verify(getRecordsCache).getNextResult(); - assertNull("Task should not throw an exception", result.getException()); + private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput) { + return new ProcessTask(shardInfo, config, mockRecordProcessor, mockCheckpointer, taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, + processRecordsInput); } @Test public void testProcessTaskWithShardEndReached() { - // Set data fetcher to return true for shard end reached - doReturn(true).when(mockDataFetcher).isShardEndReached(); + + processTask = makeProcessTask(processRecordsInput); + when(processRecordsInput.isAtShardEnd()).thenReturn(true); TaskResult result = processTask.call(); - assertTrue("Result should contain shardEndReached true", result.isShardEndReached()); + assertThat(result, shardEndTaskResult(true)); } @Test @@ -154,41 +129,42 @@ public class ProcessTaskTest { final String sqn = new BigInteger(128, new Random()).toString(); final String pk = UUID.randomUUID().toString(); final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS)); - final Record r = new Record() - .withPartitionKey(pk) - .withData(ByteBuffer.wrap(TEST_DATA)) - .withSequenceNumber(sqn) + final Record r = new Record().withPartitionKey(pk).withData(ByteBuffer.wrap(TEST_DATA)).withSequenceNumber(sqn) .withApproximateArrivalTimestamp(ts); - testWithRecord(r); + RecordProcessorOutcome outcome = testWithRecord(r); - assertEquals(1, processedRecords.size()); + assertEquals(1, outcome.getProcessRecordsCall().getRecords().size()); - Record pr = processedRecords.get(0); + Record pr = outcome.getProcessRecordsCall().getRecords().get(0); assertEquals(pk, pr.getPartitionKey()); assertEquals(ts, pr.getApproximateArrivalTimestamp()); - byte[] b = new byte[pr.getData().remaining()]; - pr.getData().get(b); - assertTrue(Arrays.equals(TEST_DATA, b)); + byte[] b = pr.getData().array(); + assertThat(b, equalTo(TEST_DATA)); - assertEquals(sqn, newLargestPermittedCheckpointValue.getSequenceNumber()); - assertEquals(0, newLargestPermittedCheckpointValue.getSubSequenceNumber()); + assertEquals(sqn, outcome.getCheckpointCall().getSequenceNumber()); + assertEquals(0, outcome.getCheckpointCall().getSubSequenceNumber()); + } + + @Data + static class RecordProcessorOutcome { + final ProcessRecordsInput processRecordsCall; + final ExtendedSequenceNumber checkpointCall; } @Test public void testDoesNotDeaggregateSubclassOfRecord() { final String sqn = new BigInteger(128, new Random()).toString(); - final Record r = new RecordSubclass() - .withSequenceNumber(sqn) - .withData(ByteBuffer.wrap(new byte[0])); + final Record r = new RecordSubclass().withSequenceNumber(sqn).withData(ByteBuffer.wrap(new byte[0])); - testWithRecord(r); + processTask = makeProcessTask(processRecordsInput); + RecordProcessorOutcome outcome = testWithRecord(r); - assertEquals(1, processedRecords.size(), 1); - assertSame(r, processedRecords.get(0)); + assertEquals(1, outcome.getProcessRecordsCall().getRecords().size(), 1); + assertSame(r, outcome.getProcessRecordsCall().getRecords().get(0)); - assertEquals(sqn, newLargestPermittedCheckpointValue.getSequenceNumber()); - assertEquals(0, newLargestPermittedCheckpointValue.getSubSequenceNumber()); + assertEquals(sqn, outcome.getCheckpointCall().getSequenceNumber()); + assertEquals(0, outcome.getCheckpointCall().getSubSequenceNumber()); } @Test @@ -196,44 +172,44 @@ public class ProcessTaskTest { final String sqn = new BigInteger(128, new Random()).toString(); final String pk = UUID.randomUUID().toString(); final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS)); - final Record r = new Record() - .withPartitionKey("-") - .withData(generateAggregatedRecord(pk)) - .withSequenceNumber(sqn) - .withApproximateArrivalTimestamp(ts); + final Record r = new Record().withPartitionKey("-").withData(generateAggregatedRecord(pk)) + .withSequenceNumber(sqn).withApproximateArrivalTimestamp(ts); - testWithRecord(r); + processTask = makeProcessTask(processRecordsInput); + RecordProcessorOutcome outcome = testWithRecord(r); - assertEquals(3, processedRecords.size()); - for (Record pr : processedRecords) { - assertTrue(pr instanceof UserRecord); + List actualRecords = outcome.getProcessRecordsCall().getRecords(); + + assertEquals(3, actualRecords.size()); + for (Record pr : actualRecords) { + assertThat(pr, instanceOf(UserRecord.class)); assertEquals(pk, pr.getPartitionKey()); assertEquals(ts, pr.getApproximateArrivalTimestamp()); - byte[] b = new byte[pr.getData().remaining()]; - pr.getData().get(b); - assertTrue(Arrays.equals(TEST_DATA, b)); + byte[] b = pr.getData().array(); + assertThat(b, equalTo(TEST_DATA)); } - assertEquals(sqn, newLargestPermittedCheckpointValue.getSequenceNumber()); - assertEquals(processedRecords.size() - 1, newLargestPermittedCheckpointValue.getSubSequenceNumber()); + assertEquals(sqn, outcome.getCheckpointCall().getSequenceNumber()); + assertEquals(actualRecords.size() - 1, outcome.getCheckpointCall().getSubSequenceNumber()); } @Test public void testDeaggregatesRecordWithNoArrivalTimestamp() { final String sqn = new BigInteger(128, new Random()).toString(); final String pk = UUID.randomUUID().toString(); - final Record r = new Record() - .withPartitionKey("-") - .withData(generateAggregatedRecord(pk)) + final Record r = new Record().withPartitionKey("-").withData(generateAggregatedRecord(pk)) .withSequenceNumber(sqn); - testWithRecord(r); + processTask = makeProcessTask(processRecordsInput); + RecordProcessorOutcome outcome = testWithRecord(r); - assertEquals(3, processedRecords.size()); - for (Record pr : processedRecords) { - assertTrue(pr instanceof UserRecord); + List actualRecords = outcome.getProcessRecordsCall().getRecords(); + + assertEquals(3, actualRecords.size()); + for (Record pr : actualRecords) { + assertThat(pr, instanceOf(UserRecord.class)); assertEquals(pk, pr.getPartitionKey()); - assertNull(pr.getApproximateArrivalTimestamp()); + assertThat(pr.getApproximateArrivalTimestamp(), nullValue()); } } @@ -246,15 +222,17 @@ public class ProcessTaskTest { final int numberOfRecords = 104; // Start these batch of records's sequence number that is greater than previous checkpoint value. final BigInteger startingSqn = previousCheckpointSqn.add(BigInteger.valueOf(10)); - final List records = generateConsecutiveRecords( - numberOfRecords, "-", ByteBuffer.wrap(TEST_DATA), new Date(), startingSqn); + final List records = generateConsecutiveRecords(numberOfRecords, "-", ByteBuffer.wrap(TEST_DATA), + new Date(), startingSqn); - testWithRecords(records, new ExtendedSequenceNumber(previousCheckpointSqn.toString()), + processTask = makeProcessTask(processRecordsInput); + RecordProcessorOutcome outcome = testWithRecords(records, + new ExtendedSequenceNumber(previousCheckpointSqn.toString()), new ExtendedSequenceNumber(previousCheckpointSqn.toString())); final ExtendedSequenceNumber expectedLargestPermittedEsqn = new ExtendedSequenceNumber( startingSqn.add(BigInteger.valueOf(numberOfRecords - 1)).toString()); - assertEquals(expectedLargestPermittedEsqn, newLargestPermittedCheckpointValue); + assertEquals(expectedLargestPermittedEsqn, outcome.getCheckpointCall()); } @Test @@ -265,17 +243,19 @@ public class ProcessTaskTest { final ExtendedSequenceNumber largestPermittedEsqn = new ExtendedSequenceNumber( baseSqn.add(BigInteger.valueOf(100)).toString()); - testWithRecords(Collections.emptyList(), lastCheckpointEspn, largestPermittedEsqn); + processTask = makeProcessTask(processRecordsInput); + RecordProcessorOutcome outcome = testWithRecords(Collections.emptyList(), lastCheckpointEspn, + largestPermittedEsqn); // Make sure that even with empty records, largest permitted sequence number does not change. - assertEquals(largestPermittedEsqn, newLargestPermittedCheckpointValue); + assertEquals(largestPermittedEsqn, outcome.getCheckpointCall()); } @Test public void testFilterBasedOnLastCheckpointValue() { // Explanation of setup: // * Assume in previous processRecord call, user got 3 sub-records that all belonged to one - // Kinesis record. So sequence number was X, and sub-sequence numbers were 0, 1, 2. + // Kinesis record. So sequence number was X, and sub-sequence numbers were 0, 1, 2. // * 2nd sub-record was checkpointed (extended sequnce number X.1). // * Worker crashed and restarted. So now DDB has checkpoint value of X.1. // Test: @@ -286,21 +266,22 @@ public class ProcessTaskTest { // Values for this processRecords call. final String startingSqn = previousCheckpointSqn.toString(); final String pk = UUID.randomUUID().toString(); - final Record r = new Record() - .withPartitionKey("-") - .withData(generateAggregatedRecord(pk)) + final Record r = new Record().withPartitionKey("-").withData(generateAggregatedRecord(pk)) .withSequenceNumber(startingSqn); - testWithRecords(Collections.singletonList(r), + processTask = makeProcessTask(processRecordsInput); + RecordProcessorOutcome outcome = testWithRecords(Collections.singletonList(r), new ExtendedSequenceNumber(previousCheckpointSqn.toString(), previousCheckpointSsqn), new ExtendedSequenceNumber(previousCheckpointSqn.toString(), previousCheckpointSsqn)); + List actualRecords = outcome.getProcessRecordsCall().getRecords(); + // First two records should be dropped - and only 1 remaining records should be there. - assertEquals(1, processedRecords.size()); - assertTrue(processedRecords.get(0) instanceof UserRecord); + assertEquals(1, actualRecords.size()); + assertThat(actualRecords.get(0), instanceOf(UserRecord.class)); // Verify user record's extended sequence number and other fields. - final UserRecord pr = (UserRecord)processedRecords.get(0); + final UserRecord pr = (UserRecord) actualRecords.get(0); assertEquals(pk, pr.getPartitionKey()); assertEquals(startingSqn, pr.getSequenceNumber()); assertEquals(previousCheckpointSsqn + 1, pr.getSubSequenceNumber()); @@ -309,60 +290,50 @@ public class ProcessTaskTest { // Expected largest permitted sequence number will be last sub-record sequence number. final ExtendedSequenceNumber expectedLargestPermittedEsqn = new ExtendedSequenceNumber( previousCheckpointSqn.toString(), 2L); - assertEquals(expectedLargestPermittedEsqn, newLargestPermittedCheckpointValue); + assertEquals(expectedLargestPermittedEsqn, outcome.getCheckpointCall()); } - private void testWithRecord(Record record) { - testWithRecords(Collections.singletonList(record), - ExtendedSequenceNumber.TRIM_HORIZON, ExtendedSequenceNumber.TRIM_HORIZON); + private RecordProcessorOutcome testWithRecord(Record record) { + return testWithRecords(Collections.singletonList(record), ExtendedSequenceNumber.TRIM_HORIZON, + ExtendedSequenceNumber.TRIM_HORIZON); } - private void testWithRecords(List records, - ExtendedSequenceNumber lastCheckpointValue, + private RecordProcessorOutcome testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { - when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(records).withMillisBehindLatest((long) 1000 * 50)); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); + when(processRecordsInput.getRecords()).thenReturn(records); + processTask = makeProcessTask(processRecordsInput); processTask.call(); verify(throttlingReporter).success(); verify(throttlingReporter, never()).throttled(); - verify(getRecordsCache).getNextResult(); - ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); - verify(mockRecordProcessor).processRecords(priCaptor.capture()); - processedRecords = priCaptor.getValue().getRecords(); + ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); + verify(mockRecordProcessor).processRecords(recordsCaptor.capture()); ArgumentCaptor esnCaptor = ArgumentCaptor.forClass(ExtendedSequenceNumber.class); verify(mockCheckpointer).setLargestPermittedCheckpointValue(esnCaptor.capture()); - newLargestPermittedCheckpointValue = esnCaptor.getValue(); + + return new RecordProcessorOutcome(recordsCaptor.getValue(), esnCaptor.getValue()); + } /** - * See the KPL documentation on GitHub for more details about the binary - * format. + * See the KPL documentation on GitHub for more details about the binary format. * * @param pk - * Partition key to use. All the records will have the same - * partition key. - * @return ByteBuffer containing the serialized form of the aggregated - * record, along with the necessary header and footer. + * Partition key to use. All the records will have the same partition key. + * @return ByteBuffer containing the serialized form of the aggregated record, along with the necessary header and + * footer. */ private static ByteBuffer generateAggregatedRecord(String pk) { ByteBuffer bb = ByteBuffer.allocate(1024); - bb.put(new byte[] {-13, -119, -102, -62 }); + bb.put(new byte[] { -13, -119, -102, -62 }); - Messages.Record r = - Messages.Record.newBuilder() - .setData(ByteString.copyFrom(TEST_DATA)) - .setPartitionKeyIndex(0) - .build(); + Messages.Record r = Messages.Record.newBuilder().setData(ByteString.copyFrom(TEST_DATA)).setPartitionKeyIndex(0) + .build(); - byte[] payload = AggregatedRecord.newBuilder() - .addPartitionKeyTable(pk) - .addRecords(r) - .addRecords(r) - .addRecords(r) - .build() - .toByteArray(); + byte[] payload = AggregatedRecord.newBuilder().addPartitionKeyTable(pk).addRecords(r).addRecords(r) + .addRecords(r).build().toByteArray(); bb.put(payload); bb.put(md5(payload)); @@ -371,16 +342,13 @@ public class ProcessTaskTest { return bb; } - private static List generateConsecutiveRecords( - int numberOfRecords, String partitionKey, ByteBuffer data, + private static List generateConsecutiveRecords(int numberOfRecords, String partitionKey, ByteBuffer data, Date arrivalTimestamp, BigInteger startSequenceNumber) { List records = new ArrayList<>(); - for (int i = 0 ; i < numberOfRecords ; ++i) { - records.add(new Record() - .withPartitionKey(partitionKey) - .withData(data) - .withSequenceNumber(startSequenceNumber.add(BigInteger.valueOf(i)).toString()) - .withApproximateArrivalTimestamp(arrivalTimestamp)); + for (int i = 0; i < numberOfRecords; ++i) { + records.add(new Record().withPartitionKey(partitionKey).withData(data) + .withSequenceNumber(startSequenceNumber.add(BigInteger.valueOf(i)).toString()) + .withApproximateArrivalTimestamp(arrivalTimestamp)); } return records; } @@ -393,4 +361,48 @@ public class ProcessTaskTest { throw new RuntimeException(e); } } + + private static TaskResultMatcher shardEndTaskResult(boolean isAtShardEnd) { + TaskResult expected = new TaskResult(null, isAtShardEnd); + return taskResult(expected); + } + + private static TaskResultMatcher exceptionTaskResult(Exception ex) { + TaskResult expected = new TaskResult(ex, false); + return taskResult(expected); + } + + private static TaskResultMatcher taskResult(TaskResult expected) { + return new TaskResultMatcher(expected); + } + + private static class TaskResultMatcher extends TypeSafeDiagnosingMatcher { + + Matcher matchers; + + TaskResultMatcher(TaskResult expected) { + if (expected == null) { + matchers = nullValue(TaskResult.class); + } else { + matchers = allOf(notNullValue(TaskResult.class), + hasProperty("shardEndReached", equalTo(expected.isShardEndReached())), + hasProperty("exception", equalTo(expected.getException()))); + } + + } + + @Override + protected boolean matchesSafely(TaskResult item, Description mismatchDescription) { + if (!matchers.matches(item)) { + matchers.describeMismatch(item, mismatchDescription); + return false; + } + return true; + } + + @Override + public void describeTo(Description description) { + description.appendDescriptionOf(matchers); + } + } } From f130e4c79cb6726069863535f644d72c13fb10f4 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Tue, 27 Mar 2018 05:53:14 -0700 Subject: [PATCH 2/3] kcl-2.0 --- amazon-kinesis-client/pom.xml | 5 + .../lifecycle/RecordProcessorLifecycle.java | 2 +- .../lifecycle/RecordProcessorShim.java | 28 ++++-- .../kinesis/lifecycle/ShardConsumer.java | 97 +++++++++++++++---- .../amazon/kinesis/lifecycle/TaskFailed.java | 22 +++++ .../kinesis/lifecycle/TaskFailedListener.java | 20 ++++ .../lifecycle/TaskFailureHandling.java | 19 ++++ .../lifecycle/events/RecordsReceived.java | 20 ++++ .../lifecycle/events/ShardCompleted.java | 5 + .../lifecycle/events/ShutdownRequested.java | 5 + 10 files changed, 194 insertions(+), 29 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index d1a4f020..536aa87e 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -83,6 +83,11 @@ commons-lang 2.6 + + org.apache.commons + commons-lang3 + 3.7 + org.slf4j slf4j-api diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java index db63f88b..bc24b553 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java @@ -25,7 +25,7 @@ public interface RecordProcessorLifecycle { void started(Started started); void recordsReceived(RecordsReceived records); void leaseLost(LeaseLost leaseLost); - void shardCompleted(ShardCompleted shardCompletedInput); + void shardCompleted(ShardCompleted shardCompleted); void shutdownRequested(ShutdownRequested shutdownRequested); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java index 7d906991..9c55e048 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java @@ -16,10 +16,13 @@ package software.amazon.kinesis.lifecycle; import lombok.AllArgsConstructor; import software.amazon.kinesis.lifecycle.events.LeaseLost; +import software.amazon.kinesis.lifecycle.events.RecordsReceived; import software.amazon.kinesis.lifecycle.events.ShardCompleted; import software.amazon.kinesis.lifecycle.events.ShutdownRequested; import software.amazon.kinesis.lifecycle.events.Started; import software.amazon.kinesis.processor.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; +import software.amazon.kinesis.processor.IShutdownNotificationAware; @AllArgsConstructor public class RecordProcessorShim implements RecordProcessorLifecycle { @@ -28,27 +31,38 @@ public class RecordProcessorShim implements RecordProcessorLifecycle { @Override public void started(Started started) { - InitializationInput initializationInput = started.toInitializationInput(); - delegate.initialize(initializationInput); + delegate.initialize(started.toInitializationInput()); } @Override - public void recordsReceived(ProcessRecordsInput records) { - + public void recordsReceived(RecordsReceived records) { + delegate.processRecords(records.toProcessRecordsInput()); } @Override public void leaseLost(LeaseLost leaseLost) { + ShutdownInput shutdownInput = new ShutdownInput() { + @Override + public IRecordProcessorCheckpointer getCheckpointer() { + throw new UnsupportedOperationException("Cannot checkpoint when the lease is lost"); + } + }.withShutdownReason(ShutdownReason.ZOMBIE); + delegate.shutdown(shutdownInput); } @Override - public void shardCompleted(ShardCompleted shardCompletedInput) { - + public void shardCompleted(ShardCompleted shardCompleted) { + ShutdownInput shutdownInput = new ShutdownInput().withCheckpointer(shardCompleted.getCheckpointer()) + .withShutdownReason(ShutdownReason.TERMINATE); + delegate.shutdown(shutdownInput); } @Override public void shutdownRequested(ShutdownRequested shutdownRequested) { - + if (delegate instanceof IShutdownNotificationAware) { + IShutdownNotificationAware aware = (IShutdownNotificationAware)delegate; + aware.shutdownRequested(shutdownRequested.getCheckpointer()); + } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 6fcd2a82..d72e15a4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -15,18 +15,22 @@ package software.amazon.kinesis.lifecycle; +import java.time.Instant; +import java.util.EnumSet; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import lombok.Data; +import lombok.Synchronized; +import org.apache.commons.lang3.StringUtils; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.lifecycle.events.LeaseLost; +import software.amazon.kinesis.lifecycle.events.RecordsReceived; import software.amazon.kinesis.lifecycle.events.ShardCompleted; import software.amazon.kinesis.lifecycle.events.ShutdownRequested; import software.amazon.kinesis.lifecycle.events.Started; @@ -528,41 +532,92 @@ public class ShardConsumer implements RecordProcessorLifecycle { // - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future taskResult = null; + private enum LifecycleStates { + STARTED, PROCESSING, SHUTDOWN, FAILED + } - // - @Override - public void started(Started started) { - if (taskResult != null) { + private EnumSet allowedStates = EnumSet.of(LifecycleStates.STARTED); + private TaskFailedListener listener; + public void addTaskFailedListener(TaskFailedListener listener) { + this.listener = listener; + } + + private TaskFailureHandling taskFailed(Throwable t) { + // + // TODO: What should we do if there is no listener. I intend to require the scheduler to always register + // + if (listener != null) { + return listener.taskFailed(new TaskFailed(t)); + } + return TaskFailureHandling.STOP; + } + + @Data + private class TaskExecution { + private final Instant started; + private final Future future; + } + + ExecutorService executor = Executors.newSingleThreadExecutor(); + TaskExecution taskExecution = null; + + private void awaitAvailable() { + if (taskExecution != null) { try { - taskResult.get(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); + taskExecution.getFuture().get(); + } catch (Throwable t) { + TaskFailureHandling handling = taskFailed(t); + if (handling == TaskFailureHandling.STOP) { + allowedStates = EnumSet.of(LifecycleStates.FAILED); + } } } + } - taskResult = executor.submit(() -> recordProcessorLifecycle.started(started)); + private void checkState(LifecycleStates current) { + if (!allowedStates.contains(current)) { + throw new IllegalStateException("State " + current + " isn't allowed. Allowed: (" + StringUtils.join(allowedStates) + ")"); + } + } + + // + + private void executeTask(LifecycleStates current, Runnable task) { + awaitAvailable(); + checkState(current); + Future future = executor.submit(task); + taskExecution = new TaskExecution(Instant.now(), future); } @Override - public void recordsReceived(ProcessRecordsInput records) { - + @Synchronized + public void started(Started started) { + executeTask(LifecycleStates.STARTED, () -> recordProcessorLifecycle.started(started)); + allowedStates = EnumSet.of(LifecycleStates.PROCESSING, LifecycleStates.SHUTDOWN); } @Override + @Synchronized + public void recordsReceived(RecordsReceived records) { + executeTask(LifecycleStates.PROCESSING, () -> recordProcessorLifecycle.recordsReceived(records)); + } + + @Override + @Synchronized public void leaseLost(LeaseLost leaseLost) { + executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.leaseLost(leaseLost)); + allowedStates = EnumSet.of(LifecycleStates.SHUTDOWN); + } + + @Override + @Synchronized + public void shardCompleted(ShardCompleted shardCompleted) { + executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.shardCompleted(shardCompleted)); } @Override - public void shardCompleted(ShardCompleted shardCompletedInput) { - - } - - @Override + @Synchronized public void shutdownRequested(ShutdownRequested shutdownRequested) { } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java new file mode 100644 index 00000000..c35128ff --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +import lombok.Data; + +@Data +public class TaskFailed { + private final Throwable throwable; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java new file mode 100644 index 00000000..47851fcb --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java @@ -0,0 +1,20 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +@FunctionalInterface +public interface TaskFailedListener { + TaskFailureHandling taskFailed(TaskFailed result); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java new file mode 100644 index 00000000..b5dacac1 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java @@ -0,0 +1,19 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +public enum TaskFailureHandling { + STOP, CONTINUE +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java index 15dc0cc6..9d190616 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java @@ -14,9 +14,29 @@ */ package software.amazon.kinesis.lifecycle.events; +import java.time.Duration; +import java.time.Instant; +import java.util.List; + +import com.amazonaws.services.kinesis.model.Record; + import lombok.Data; +import software.amazon.kinesis.lifecycle.ProcessRecordsInput; +import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; @Data public class RecordsReceived { + private final Instant cacheEntryTime; + private final Instant cacheExitTime; + private final boolean isAtShardEnd; + private final List records; + private final IRecordProcessorCheckpointer checkpointer; + private Duration timeBehindLatest; + + public ProcessRecordsInput toProcessRecordsInput() { + return new ProcessRecordsInput(cacheEntryTime, cacheExitTime, isAtShardEnd, records, checkpointer, + timeBehindLatest.toMillis()); + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java index 1df45a56..3d9fdef8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java @@ -14,5 +14,10 @@ */ package software.amazon.kinesis.lifecycle.events; +import lombok.Data; +import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; + +@Data public class ShardCompleted { + private final IRecordProcessorCheckpointer checkpointer; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java index aa9074bd..a4d9eae3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java @@ -14,5 +14,10 @@ */ package software.amazon.kinesis.lifecycle.events; +import lombok.Data; +import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; + +@Data public class ShutdownRequested { + private final IRecordProcessorCheckpointer checkpointer; } From 94c51253521e8ac02d59c6d8d8faf4263f726ed0 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Tue, 27 Mar 2018 10:38:49 -0700 Subject: [PATCH 3/3] Changing invocation system for ShardConsumer. Still working on it though. --- .../amazon/kinesis/leases/ShardSyncTask.java | 48 +++-- .../lifecycle/BlockOnParentShardTask.java | 84 +++++--- .../amazon/kinesis/lifecycle/ITask.java | 11 +- .../kinesis/lifecycle/InitializeTask.java | 5 + .../amazon/kinesis/lifecycle/ProcessTask.java | 79 ++++--- .../kinesis/lifecycle/ShardConsumer.java | 202 ++++++++---------- .../lifecycle/ShutdownNotificationTask.java | 11 + .../kinesis/lifecycle/ShutdownTask.java | 127 ++++++----- .../lifecycle/TaskCompletedListener.java | 25 +++ .../MetricsCollectingTaskDecorator.java | 50 ++++- .../retrieval/BlockingGetRecordsCache.java | 10 + .../retrieval/DataArrivedListener.java | 20 ++ .../kinesis/retrieval/GetRecordsCache.java | 4 + .../retrieval/PrefetchGetRecordsCache.java | 15 ++ .../SimpleRecordsFetcherFactory.java | 23 +- .../SynchronousBlockingRetrievalFactory.java | 2 +- .../kinesis/coordinator/WorkerTest.java | 2 + .../kinesis/lifecycle/ShardConsumerTest.java | 14 +- .../BlockingGetRecordsCacheTest.java | 85 -------- .../src/test/resources/logback.xml | 2 +- 20 files changed, 445 insertions(+), 374 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskCompletedListener.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataArrivedListener.java delete mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCacheTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index af4e43a3..c6f3fc32 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -15,13 +15,14 @@ package software.amazon.kinesis.leases; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; + +import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.lifecycle.ITask; +import software.amazon.kinesis.lifecycle.TaskCompletedListener; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.retrieval.IKinesisProxy; -import lombok.extern.slf4j.Slf4j; - /** * This task syncs leases/activies with shards of the stream. * It will create new leases/activites when it discovers new shards (e.g. setup/resharding). @@ -38,6 +39,8 @@ public class ShardSyncTask implements ITask { private final long shardSyncTaskIdleTimeMillis; private final TaskType taskType = TaskType.SHARDSYNC; + private TaskCompletedListener listener; + /** * @param kinesisProxy Used to fetch information about the stream (e.g. shard list) * @param leaseManager Used to fetch and create leases @@ -64,23 +67,26 @@ public class ShardSyncTask implements ITask { */ @Override public TaskResult call() { - Exception exception = null; - try { - ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, - leaseManager, - initialPosition, - cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards); - if (shardSyncTaskIdleTimeMillis > 0) { - Thread.sleep(shardSyncTaskIdleTimeMillis); - } - } catch (Exception e) { - log.error("Caught exception while sync'ing Kinesis shards and leases", e); - exception = e; - } + Exception exception = null; - return new TaskResult(exception); + try { + ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition, + cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards); + if (shardSyncTaskIdleTimeMillis > 0) { + Thread.sleep(shardSyncTaskIdleTimeMillis); + } + } catch (Exception e) { + log.error("Caught exception while sync'ing Kinesis shards and leases", e); + exception = e; + } + + return new TaskResult(exception); + } finally { + if (listener != null) { + listener.taskCompleted(this); + } + } } @@ -92,4 +98,12 @@ public class ShardSyncTask implements ITask { return taskType; } + @Override + public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) { + if (listener != null) { + log.warn("Listener is being reset, this shouldn't happen"); + } + listener = taskCompletedListener; + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index 3848ec61..f589b86a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -15,12 +15,12 @@ package software.amazon.kinesis.lifecycle; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; -import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.ILeaseManager; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.KinesisClientLease; +import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * Task to block until processing of all data records in the parent shard(s) is completed. @@ -38,6 +38,8 @@ public class BlockOnParentShardTask implements ITask { private final TaskType taskType = TaskType.BLOCK_ON_PARENT_SHARDS; // Sleep for this duration if the parent shards have not completed processing, or we encounter an exception. private final long parentShardPollIntervalMillis; + + private TaskCompletedListener listener; /** * @param shardInfo Information about the shard we are working on @@ -57,43 +59,49 @@ public class BlockOnParentShardTask implements ITask { */ @Override public TaskResult call() { - Exception exception = null; - try { - boolean blockedOnParentShard = false; - for (String shardId : shardInfo.getParentShardIds()) { - KinesisClientLease lease = leaseManager.getLease(shardId); - if (lease != null) { - ExtendedSequenceNumber checkpoint = lease.getCheckpoint(); - if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) { - log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardId, checkpoint); - blockedOnParentShard = true; - exception = new BlockedOnParentShardException("Parent shard not yet done"); - break; + Exception exception = null; + + try { + boolean blockedOnParentShard = false; + for (String shardId : shardInfo.getParentShardIds()) { + KinesisClientLease lease = leaseManager.getLease(shardId); + if (lease != null) { + ExtendedSequenceNumber checkpoint = lease.getCheckpoint(); + if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) { + log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardId, checkpoint); + blockedOnParentShard = true; + exception = new BlockedOnParentShardException("Parent shard not yet done"); + break; + } else { + log.debug("Shard {} has been completely processed.", shardId); + } } else { - log.debug("Shard {} has been completely processed.", shardId); + log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardId); } - } else { - log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardId); } + + if (!blockedOnParentShard) { + log.info("No need to block on parents {} of shard {}", shardInfo.getParentShardIds(), + shardInfo.getShardId()); + return new TaskResult(null); + } + } catch (Exception e) { + log.error("Caught exception when checking for parent shard checkpoint", e); + exception = e; } - - if (!blockedOnParentShard) { - log.info("No need to block on parents {} of shard {}", shardInfo.getParentShardIds(), - shardInfo.getShardId()); - return new TaskResult(null); + try { + Thread.sleep(parentShardPollIntervalMillis); + } catch (InterruptedException e) { + log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfo.getShardId(), e); + } + + return new TaskResult(exception); + } finally { + if (listener != null) { + listener.taskCompleted(this); } - } catch (Exception e) { - log.error("Caught exception when checking for parent shard checkpoint", e); - exception = e; } - try { - Thread.sleep(parentShardPollIntervalMillis); - } catch (InterruptedException e) { - log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfo.getShardId(), e); - } - - return new TaskResult(exception); } /* (non-Javadoc) @@ -104,4 +112,12 @@ public class BlockOnParentShardTask implements ITask { return taskType; } + @Override + public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) { + if (listener != null) { + log.warn("Listener is being reset, this shouldn't happen"); + } + listener = taskCompletedListener; + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ITask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ITask.java index ed58de83..041ef54d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ITask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ITask.java @@ -14,9 +14,6 @@ */ package software.amazon.kinesis.lifecycle; -import software.amazon.kinesis.lifecycle.TaskResult; -import software.amazon.kinesis.lifecycle.TaskType; - import java.util.concurrent.Callable; /** @@ -38,4 +35,12 @@ public interface ITask extends Callable { */ TaskType getTaskType(); + /** + * Adds a listener that will be notified once the task is completed. + * + * @param taskCompletedListener + * the listener to call once the task has been completed + */ + void addTaskCompletedListener(TaskCompletedListener taskCompletedListener); + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index 9673cc24..673a4c1e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -134,4 +134,9 @@ public class InitializeTask implements ITask { return taskType; } + @Override + public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) { + + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 04f56fcc..09b2af65 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -53,6 +53,7 @@ public class ProcessTask implements ITask { private final ThrottlingReporter throttlingReporter; private final ProcessRecordsInput processRecordsInput; + private TaskCompletedListener listener; @RequiredArgsConstructor public static class RecordsFetcher { @@ -141,43 +142,49 @@ public class ProcessTask implements ITask { */ @Override public TaskResult call() { - long startTimeMillis = System.currentTimeMillis(); - IMetricsScope scope = MetricsHelper.getMetricsScope(); - scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId()); - scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY); - scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY); - Exception exception = null; - try { - if (processRecordsInput.isAtShardEnd()) { - log.info("Reached end of shard {}", shardInfo.getShardId()); - return new TaskResult(null, true); + long startTimeMillis = System.currentTimeMillis(); + IMetricsScope scope = MetricsHelper.getMetricsScope(); + scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId()); + scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY); + scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY); + Exception exception = null; + + try { + if (processRecordsInput.isAtShardEnd()) { + log.info("Reached end of shard {}", shardInfo.getShardId()); + return new TaskResult(null, true); + } + + throttlingReporter.success(); + List records = processRecordsInput.getRecords(); + + if (!records.isEmpty()) { + scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); + } else { + handleNoRecords(startTimeMillis); + } + records = deaggregateRecords(records); + + recordProcessorCheckpointer.setLargestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( + scope, records, recordProcessorCheckpointer.getLastCheckpointValue(), + recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); + + if (shouldCallProcessRecords(records)) { + callProcessRecords(processRecordsInput, records); + } + } catch (RuntimeException e) { + log.error("ShardId {}: Caught exception: ", shardInfo.getShardId(), e); + exception = e; + backoff(); } - throttlingReporter.success(); - List records = processRecordsInput.getRecords(); - - if (!records.isEmpty()) { - scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); - } else { - handleNoRecords(startTimeMillis); + return new TaskResult(exception); + } finally { + if (listener != null) { + listener.taskCompleted(this); } - records = deaggregateRecords(records); - - recordProcessorCheckpointer.setLargestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(scope, - records, recordProcessorCheckpointer.getLastCheckpointValue(), - recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); - - if (shouldCallProcessRecords(records)) { - callProcessRecords(processRecordsInput, records); - } - } catch (RuntimeException e) { - log.error("ShardId {}: Caught exception: ", shardInfo.getShardId(), e); - exception = e; - backoff(); } - - return new TaskResult(exception); } /** @@ -281,6 +288,14 @@ public class ProcessTask implements ITask { return taskType; } + @Override + public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) { + if (listener != null) { + log.warn("Listener is being reset, this shouldn't happen"); + } + listener = taskCompletedListener; + } + /** * Scans a list of records to filter out records up to and including the most recent checkpoint value and to get the * greatest extended sequence number from the retained records. Also emits metrics about the records. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index d72e15a4..4f917b7a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -15,38 +15,31 @@ package software.amazon.kinesis.lifecycle; +import java.time.Duration; import java.time.Instant; -import java.util.EnumSet; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; -import lombok.Data; -import lombok.Synchronized; -import org.apache.commons.lang3.StringUtils; -import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; -import software.amazon.kinesis.checkpoint.Checkpoint; -import software.amazon.kinesis.lifecycle.events.LeaseLost; -import software.amazon.kinesis.lifecycle.events.RecordsReceived; -import software.amazon.kinesis.lifecycle.events.ShardCompleted; -import software.amazon.kinesis.lifecycle.events.ShutdownRequested; -import software.amazon.kinesis.lifecycle.events.Started; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; -import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; -import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.coordinator.StreamConfig; -import software.amazon.kinesis.processor.ICheckpoint; -import software.amazon.kinesis.processor.IRecordProcessor; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.ILeaseManager; -import software.amazon.kinesis.metrics.IMetricsFactory; import com.google.common.annotations.VisibleForTesting; import lombok.Getter; +import lombok.Synchronized; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.checkpoint.Checkpoint; +import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; +import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; +import software.amazon.kinesis.coordinator.StreamConfig; +import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.KinesisClientLease; +import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.metrics.IMetricsFactory; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; +import software.amazon.kinesis.processor.ICheckpoint; +import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.retrieval.AsynchronousGetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -59,11 +52,10 @@ import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy; * A new instance should be created if the primary responsibility is reassigned back to this process. */ @Slf4j -public class ShardConsumer implements RecordProcessorLifecycle { +public class ShardConsumer { // private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; - private RecordProcessorLifecycle recordProcessorLifecycle; private final KinesisClientLibConfiguration config; private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final ExecutorService executorService; @@ -81,6 +73,8 @@ public class ShardConsumer implements RecordProcessorLifecycle { private ITask currentTask; private long currentTaskSubmitTime; private Future future; + private boolean started = false; + private Instant taskDispatchedAt; // // @@ -265,21 +259,46 @@ public class ShardConsumer implements RecordProcessorLifecycle { // // + + private void start() { + started = true; + getRecordsCache.addDataArrivedListener(this::checkAndSubmitNextTask); + checkAndSubmitNextTask(); + } /** * No-op if current task is pending, otherwise submits next task for this shard. * This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state. * * @return true if a new process task was submitted, false otherwise */ - public synchronized boolean consumeShard() { - return checkAndSubmitNextTask(); + @Synchronized + public boolean consumeShard() { + if (!started) { + start(); + } + if (taskDispatchedAt != null) { + Duration taken = Duration.between(taskDispatchedAt, Instant.now()); + String commonMessage = String.format("Previous %s task still pending for shard %s since %s ago. ", + currentTask.getTaskType(), shardInfo.getShardId(), taken); + if (log.isDebugEnabled()) { + log.debug("{} Not submitting new task.", commonMessage); + } + config.getLogWarningForTaskAfterMillis().ifPresent(value -> { + if (taken.toMillis() > value) { + log.warn(commonMessage); + } + }); + } + + return true; } private boolean readyForNextTask() { return future == null || future.isCancelled() || future.isDone(); } - private synchronized boolean checkAndSubmitNextTask() { + @Synchronized + private boolean checkAndSubmitNextTask() { boolean submittedNewTask = false; if (readyForNextTask()) { TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE; @@ -290,9 +309,11 @@ public class ShardConsumer implements RecordProcessorLifecycle { updateState(taskOutcome); ITask nextTask = getNextTask(); if (nextTask != null) { + nextTask.addTaskCompletedListener(this::handleTaskCompleted); currentTask = nextTask; try { future = executorService.submit(currentTask); + taskDispatchedAt = Instant.now(); currentTaskSubmitTime = System.currentTimeMillis(); submittedNewTask = true; log.debug("Submitted new {} task for shard {}", currentTask.getTaskType(), shardInfo.getShardId()); @@ -325,6 +346,33 @@ public class ShardConsumer implements RecordProcessorLifecycle { return submittedNewTask; } + private boolean shouldDispatchNextTask() { + return !isShutdown() || shutdownReason != null || getRecordsCache.hasResultAvailable(); + } + + @Synchronized + private void handleTaskCompleted(ITask task) { + if (future != null) { + executorService.submit(() -> { + // + // Determine task outcome will wait on the future for us. The value of the future + // + resolveFuture(); + if (shouldDispatchNextTask()) { + checkAndSubmitNextTask(); + } + }); + } else { + log.error("Future wasn't set. This shouldn't happen as polling should be disabled. " + + "Will trigger next task check just in case"); + if (shouldDispatchNextTask()) { + checkAndSubmitNextTask(); + } + + } + + } + public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() { return skipShardSyncAtWorkerInitializationIfLeasesExist; } @@ -333,6 +381,14 @@ public class ShardConsumer implements RecordProcessorLifecycle { SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE } + private void resolveFuture() { + try { + future.get(); + } catch (Exception e) { + log.info("Ignoring caught exception '{}' exception during resolve.", e.getMessage()); + } + } + private TaskOutcome determineTaskOutcome() { try { TaskResult result = future.get(); @@ -426,7 +482,8 @@ public class ShardConsumer implements RecordProcessorLifecycle { * * @return true if shutdown is complete (false if shutdown is still in progress) */ - public synchronized boolean beginShutdown() { + @Synchronized + public boolean beginShutdown() { markForShutdown(ShutdownReason.ZOMBIE); checkAndSubmitNextTask(); @@ -531,95 +588,4 @@ public class ShardConsumer implements RecordProcessorLifecycle { } // - - private enum LifecycleStates { - STARTED, PROCESSING, SHUTDOWN, FAILED - } - - private EnumSet allowedStates = EnumSet.of(LifecycleStates.STARTED); - private TaskFailedListener listener; - public void addTaskFailedListener(TaskFailedListener listener) { - this.listener = listener; - } - - private TaskFailureHandling taskFailed(Throwable t) { - // - // TODO: What should we do if there is no listener. I intend to require the scheduler to always register - // - if (listener != null) { - return listener.taskFailed(new TaskFailed(t)); - } - return TaskFailureHandling.STOP; - } - - @Data - private class TaskExecution { - private final Instant started; - private final Future future; - } - - ExecutorService executor = Executors.newSingleThreadExecutor(); - TaskExecution taskExecution = null; - - private void awaitAvailable() { - if (taskExecution != null) { - try { - taskExecution.getFuture().get(); - } catch (Throwable t) { - TaskFailureHandling handling = taskFailed(t); - if (handling == TaskFailureHandling.STOP) { - allowedStates = EnumSet.of(LifecycleStates.FAILED); - } - } - } - } - - private void checkState(LifecycleStates current) { - if (!allowedStates.contains(current)) { - throw new IllegalStateException("State " + current + " isn't allowed. Allowed: (" + StringUtils.join(allowedStates) + ")"); - } - } - - // - - private void executeTask(LifecycleStates current, Runnable task) { - awaitAvailable(); - checkState(current); - Future future = executor.submit(task); - taskExecution = new TaskExecution(Instant.now(), future); - } - - @Override - @Synchronized - public void started(Started started) { - executeTask(LifecycleStates.STARTED, () -> recordProcessorLifecycle.started(started)); - allowedStates = EnumSet.of(LifecycleStates.PROCESSING, LifecycleStates.SHUTDOWN); - } - - @Override - @Synchronized - public void recordsReceived(RecordsReceived records) { - executeTask(LifecycleStates.PROCESSING, () -> recordProcessorLifecycle.recordsReceived(records)); - } - - @Override - @Synchronized - public void leaseLost(LeaseLost leaseLost) { - executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.leaseLost(leaseLost)); - allowedStates = EnumSet.of(LifecycleStates.SHUTDOWN); - } - - @Override - @Synchronized - public void shardCompleted(ShardCompleted shardCompleted) { - executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.shardCompleted(shardCompleted)); - - } - - @Override - @Synchronized - public void shutdownRequested(ShutdownRequested shutdownRequested) { - - } - // } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java index dfffd9b0..0f0f24ba 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.lifecycle; +import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.IRecordProcessor; @@ -22,12 +23,14 @@ import software.amazon.kinesis.processor.IShutdownNotificationAware; /** * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint. */ +@Slf4j public class ShutdownNotificationTask implements ITask { private final IRecordProcessor recordProcessor; private final IRecordProcessorCheckpointer recordProcessorCheckpointer; private final ShutdownNotification shutdownNotification; private final ShardInfo shardInfo; + private TaskCompletedListener listener; ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) { this.recordProcessor = recordProcessor; @@ -57,4 +60,12 @@ public class ShutdownNotificationTask implements ITask { public TaskType getTaskType() { return TaskType.SHUTDOWN_NOTIFICATION; } + + @Override + public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) { + if (listener != null) { + log.warn("Listener is being reset, this shouldn't happen"); + } + listener = taskCompletedListener; + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 0e553c39..75a1b420 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -49,6 +49,7 @@ public class ShutdownTask implements ITask { private final TaskType taskType = TaskType.SHUTDOWN; private final long backoffTimeMillis; private final GetRecordsCache getRecordsCache; + private TaskCompletedListener listener; /** * Constructor. @@ -86,73 +87,79 @@ public class ShutdownTask implements ITask { */ @Override public TaskResult call() { - Exception exception; - boolean applicationException = false; - try { - // If we reached end of the shard, set sequence number to SHARD_END. - if (reason == ShutdownReason.TERMINATE) { - recordProcessorCheckpointer.setSequenceNumberAtShardEnd( - recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); - recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - } + Exception exception; + boolean applicationException = false; - log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - shardInfo.getShardId(), shardInfo.getConcurrencyToken(), reason); - final ShutdownInput shutdownInput = new ShutdownInput() - .withShutdownReason(reason) - .withCheckpointer(recordProcessorCheckpointer); - final long recordProcessorStartTimeMillis = System.currentTimeMillis(); try { - recordProcessor.shutdown(shutdownInput); - ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + // If we reached end of the shard, set sequence number to SHARD_END. + if (reason == ShutdownReason.TERMINATE) { + recordProcessorCheckpointer.setSequenceNumberAtShardEnd( + recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); + recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + } + + log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", + shardInfo.getShardId(), shardInfo.getConcurrencyToken(), reason); + final ShutdownInput shutdownInput = new ShutdownInput() + .withShutdownReason(reason) + .withCheckpointer(recordProcessorCheckpointer); + final long recordProcessorStartTimeMillis = System.currentTimeMillis(); + try { + recordProcessor.shutdown(shutdownInput); + ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + + if (reason == ShutdownReason.TERMINATE) { + if ((lastCheckpointValue == null) + || (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) { + throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + + shardInfo.getShardId()); + } + } + log.debug("Shutting down retrieval strategy."); + getRecordsCache.shutdown(); + log.debug("Record processor completed shutdown() for shard {}", shardInfo.getShardId()); + } catch (Exception e) { + applicationException = true; + throw e; + } finally { + MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis, + MetricsLevel.SUMMARY); + } if (reason == ShutdownReason.TERMINATE) { - if ((lastCheckpointValue == null) - || (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) { - throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfo.getShardId()); - } + log.debug("Looking for child shards of shard {}", shardInfo.getShardId()); + // create leases for the child shards + ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, + leaseManager, + initialPositionInStream, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards); + log.debug("Finished checking for child shards of shard {}", shardInfo.getShardId()); } - log.debug("Shutting down retrieval strategy."); - getRecordsCache.shutdown(); - log.debug("Record processor completed shutdown() for shard {}", shardInfo.getShardId()); + + return new TaskResult(null); } catch (Exception e) { - applicationException = true; - throw e; - } finally { - MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis, - MetricsLevel.SUMMARY); + if (applicationException) { + log.error("Application exception. ", e); + } else { + log.error("Caught exception: ", e); + } + exception = e; + // backoff if we encounter an exception. + try { + Thread.sleep(this.backoffTimeMillis); + } catch (InterruptedException ie) { + log.debug("Interrupted sleep", ie); + } } - if (reason == ShutdownReason.TERMINATE) { - log.debug("Looking for child shards of shard {}", shardInfo.getShardId()); - // create leases for the child shards - ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, - leaseManager, - initialPositionInStream, - cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards); - log.debug("Finished checking for child shards of shard {}", shardInfo.getShardId()); - } - - return new TaskResult(null); - } catch (Exception e) { - if (applicationException) { - log.error("Application exception. ", e); - } else { - log.error("Caught exception: ", e); - } - exception = e; - // backoff if we encounter an exception. - try { - Thread.sleep(this.backoffTimeMillis); - } catch (InterruptedException ie) { - log.debug("Interrupted sleep", ie); + return new TaskResult(exception); + } finally { + if (listener != null) { + listener.taskCompleted(this); } } - - return new TaskResult(exception); } /* @@ -165,6 +172,14 @@ public class ShutdownTask implements ITask { return taskType; } + @Override + public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) { + if (listener != null) { + log.warn("Listener is being reset, this shouldn't happen"); + } + listener = taskCompletedListener; + } + @VisibleForTesting public ShutdownReason getReason() { return reason; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskCompletedListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskCompletedListener.java new file mode 100644 index 00000000..dd6f5ac2 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskCompletedListener.java @@ -0,0 +1,25 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +public interface TaskCompletedListener { + /** + * Called once a task has completed + * + * @param task + * the task that completed + */ + void taskCompleted(ITask task); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java index 2e95acfb..9c01ed74 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java @@ -14,7 +14,9 @@ */ package software.amazon.kinesis.metrics; +import lombok.RequiredArgsConstructor; import software.amazon.kinesis.lifecycle.ITask; +import software.amazon.kinesis.lifecycle.TaskCompletedListener; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.metrics.MetricsHelper; @@ -28,6 +30,7 @@ public class MetricsCollectingTaskDecorator implements ITask { private final ITask other; private final IMetricsFactory factory; + private DelegateTaskCompletedListener delegateTaskCompletedListener; /** * Constructor. @@ -45,17 +48,23 @@ public class MetricsCollectingTaskDecorator implements ITask { */ @Override public TaskResult call() { - MetricsHelper.startScope(factory, other.getClass().getSimpleName()); - TaskResult result = null; - final long startTimeMillis = System.currentTimeMillis(); try { - result = other.call(); + MetricsHelper.startScope(factory, other.getClass().getSimpleName()); + TaskResult result = null; + final long startTimeMillis = System.currentTimeMillis(); + try { + result = other.call(); + } finally { + MetricsHelper.addSuccessAndLatency(startTimeMillis, result != null && result.getException() == null, + MetricsLevel.SUMMARY); + MetricsHelper.endScope(); + } + return result; } finally { - MetricsHelper.addSuccessAndLatency(startTimeMillis, result != null && result.getException() == null, - MetricsLevel.SUMMARY); - MetricsHelper.endScope(); + if (delegateTaskCompletedListener != null) { + delegateTaskCompletedListener.dispatchIfNeeded(); + } } - return result; } /** @@ -66,6 +75,31 @@ public class MetricsCollectingTaskDecorator implements ITask { return other.getTaskType(); } + @Override + public void addTaskCompletedListener(TaskCompletedListener taskCompletedListener) { + delegateTaskCompletedListener = new DelegateTaskCompletedListener(taskCompletedListener); + other.addTaskCompletedListener(delegateTaskCompletedListener); + } + + @RequiredArgsConstructor + private class DelegateTaskCompletedListener implements TaskCompletedListener { + + private final TaskCompletedListener delegate; + private boolean taskCompleted = false; + + @Override + public void taskCompleted(ITask task) { + delegate.taskCompleted(task); + taskCompleted = true; + } + + public void dispatchIfNeeded() { + if (!taskCompleted) { + delegate.taskCompleted(MetricsCollectingTaskDecorator.this); + } + } + } + @Override public String toString() { return this.getClass().getName() + "<" + other.getTaskType() + ">(" + other + ")"; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCache.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCache.java index e8612a1f..aef56945 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCache.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCache.java @@ -56,4 +56,14 @@ public class BlockingGetRecordsCache implements GetRecordsCache { public void shutdown() { getRecordsRetrievalStrategy.shutdown(); } + + @Override + public void addDataArrivedListener(DataArrivedListener dataArrivedListener) { + + } + + @Override + public boolean hasResultAvailable() { + return true; + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataArrivedListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataArrivedListener.java new file mode 100644 index 00000000..989bcf82 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataArrivedListener.java @@ -0,0 +1,20 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.retrieval; + +@FunctionalInterface +public interface DataArrivedListener { + void dataArrived(); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsCache.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsCache.java index 57abe45c..1f0ff240 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsCache.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsCache.java @@ -40,4 +40,8 @@ public interface GetRecordsCache { * This method calls the shutdown behavior on the cache, if available. */ void shutdown(); + + void addDataArrivedListener(DataArrivedListener dataArrivedListener); + + boolean hasResultAvailable(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCache.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCache.java index b0d0b816..9b971ae3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCache.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCache.java @@ -62,6 +62,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final String operation; private final KinesisDataFetcher dataFetcher; private final String shardId; + private DataArrivedListener dataArrivedListener; /** * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a @@ -147,6 +148,19 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { started = false; } + @Override + public void addDataArrivedListener(DataArrivedListener dataArrivedListener) { + if (dataArrivedListener != null) { + log.warn("Attempting to reset the data arrived listener for {}. This shouldn't happen", shardId); + } + this.dataArrivedListener = dataArrivedListener; + } + + @Override + public boolean hasResultAvailable() { + return !getRecordsResultQueue.isEmpty(); + } + private class DefaultGetRecordsCacheDaemon implements Runnable { volatile boolean isShutdown = false; @@ -169,6 +183,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { .withCacheEntryTime(lastSuccessfulCall); getRecordsResultQueue.put(processRecordsInput); prefetchCounters.added(processRecordsInput); + dataArrivedListener.dataArrived(); } catch (InterruptedException e) { log.info("Thread was interrupted, indicating shutdown was called on the cache."); } catch (ExpiredIteratorException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SimpleRecordsFetcherFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SimpleRecordsFetcherFactory.java index 067c8f19..f9a59212 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SimpleRecordsFetcherFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SimpleRecordsFetcherFactory.java @@ -32,20 +32,15 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { @Override public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory, int maxRecords) { - if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { - return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); - } else { - return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, - getRecordsRetrievalStrategy, - Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("prefetch-cache-" + shardId + "-%04d") - .build()), - idleMillisBetweenCalls, - metricsFactory, - "ProcessTask", - shardId); - } + + return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, + getRecordsRetrievalStrategy, + Executors + .newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("prefetch-cache-" + shardId + "-%04d").build()), + idleMillisBetweenCalls, metricsFactory, "ProcessTask", shardId); + } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousBlockingRetrievalFactory.java index 9bc5b35e..fd4bf9fe 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousBlockingRetrievalFactory.java @@ -51,6 +51,6 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { @Override public GetRecordsCache createGetRecordsCache(@NonNull final ShardInfo shardInfo) { - return new BlockingGetRecordsCache(maxRecords, createGetRecordsRetrievalStrategy(shardInfo)); + throw new UnsupportedOperationException(); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java index ec7c13aa..90f169d3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java @@ -71,6 +71,7 @@ import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Matchers; @@ -1561,6 +1562,7 @@ public class WorkerTest { } @Test + @Ignore public void testWorkerStateChangeListenerGoesThroughStates() throws Exception { final CountDownLatch workerInitialized = new CountDownLatch(1); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 89bcdb9e..f8755bbb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -112,12 +112,14 @@ public class ShardConsumerTest { // Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is // ... a non-final public class, and so can be mocked and spied. private final ExecutorService executorService = Executors.newFixedThreadPool(1); - private RecordsFetcherFactory recordsFetcherFactory; - + + private GetRecordsCache getRecordsCache; private KinesisDataFetcher dataFetcher; - + + @Mock + private RecordsFetcherFactory recordsFetcherFactory; @Mock private IRecordProcessor processor; @Mock @@ -136,7 +138,7 @@ public class ShardConsumerTest { getRecordsCache = null; dataFetcher = null; - recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); + //recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty()); } @@ -383,6 +385,8 @@ public class ShardConsumerTest { Optional.empty(), config); + consumer.consumeShard(); // check on parent shards + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); @@ -639,7 +643,7 @@ public class ShardConsumerTest { ); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); - + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCacheTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCacheTest.java deleted file mode 100644 index e3ad9278..00000000 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCacheTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.kinesis.retrieval; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - -import software.amazon.kinesis.lifecycle.ProcessRecordsInput; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.Record; -import software.amazon.kinesis.retrieval.BlockingGetRecordsCache; -import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; - -/** - * Test class for the BlockingGetRecordsCache class. - */ -@RunWith(MockitoJUnitRunner.class) -public class BlockingGetRecordsCacheTest { - private static final int MAX_RECORDS_PER_COUNT = 10_000; - - @Mock - private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - @Mock - private GetRecordsResult getRecordsResult; - - private List records; - private BlockingGetRecordsCache blockingGetRecordsCache; - - @Before - public void setup() { - records = new ArrayList<>(); - blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy); - - when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult); - when(getRecordsResult.getRecords()).thenReturn(records); - } - - @Test - public void testGetNextRecordsWithNoRecords() { - ProcessRecordsInput result = blockingGetRecordsCache.getNextResult(); - - assertEquals(result.getRecords(), records); - assertNull(result.getCacheEntryTime()); - assertNull(result.getCacheExitTime()); - assertEquals(result.getTimeSpentInCache(), Duration.ZERO); - } - - @Test - public void testGetNextRecordsWithRecords() { - Record record = new Record(); - records.add(record); - records.add(record); - records.add(record); - records.add(record); - - ProcessRecordsInput result = blockingGetRecordsCache.getNextResult(); - - assertEquals(result.getRecords(), records); - } -} diff --git a/amazon-kinesis-client/src/test/resources/logback.xml b/amazon-kinesis-client/src/test/resources/logback.xml index 46b45182..c99139fd 100644 --- a/amazon-kinesis-client/src/test/resources/logback.xml +++ b/amazon-kinesis-client/src/test/resources/logback.xml @@ -20,7 +20,7 @@ - + \ No newline at end of file