From f130e4c79cb6726069863535f644d72c13fb10f4 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Tue, 27 Mar 2018 05:53:14 -0700 Subject: [PATCH] kcl-2.0 --- amazon-kinesis-client/pom.xml | 5 + .../lifecycle/RecordProcessorLifecycle.java | 2 +- .../lifecycle/RecordProcessorShim.java | 28 ++++-- .../kinesis/lifecycle/ShardConsumer.java | 97 +++++++++++++++---- .../amazon/kinesis/lifecycle/TaskFailed.java | 22 +++++ .../kinesis/lifecycle/TaskFailedListener.java | 20 ++++ .../lifecycle/TaskFailureHandling.java | 19 ++++ .../lifecycle/events/RecordsReceived.java | 20 ++++ .../lifecycle/events/ShardCompleted.java | 5 + .../lifecycle/events/ShutdownRequested.java | 5 + 10 files changed, 194 insertions(+), 29 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index d1a4f020..536aa87e 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -83,6 +83,11 @@ commons-lang 2.6 + + org.apache.commons + commons-lang3 + 3.7 + org.slf4j slf4j-api diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java index db63f88b..bc24b553 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java @@ -25,7 +25,7 @@ public interface RecordProcessorLifecycle { void started(Started started); void recordsReceived(RecordsReceived records); void leaseLost(LeaseLost leaseLost); - void shardCompleted(ShardCompleted shardCompletedInput); + void shardCompleted(ShardCompleted shardCompleted); void shutdownRequested(ShutdownRequested shutdownRequested); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java index 7d906991..9c55e048 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java @@ -16,10 +16,13 @@ package software.amazon.kinesis.lifecycle; import lombok.AllArgsConstructor; import software.amazon.kinesis.lifecycle.events.LeaseLost; +import software.amazon.kinesis.lifecycle.events.RecordsReceived; import software.amazon.kinesis.lifecycle.events.ShardCompleted; import software.amazon.kinesis.lifecycle.events.ShutdownRequested; import software.amazon.kinesis.lifecycle.events.Started; import software.amazon.kinesis.processor.IRecordProcessor; +import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; +import software.amazon.kinesis.processor.IShutdownNotificationAware; @AllArgsConstructor public class RecordProcessorShim implements RecordProcessorLifecycle { @@ -28,27 +31,38 @@ public class RecordProcessorShim implements RecordProcessorLifecycle { @Override public void started(Started started) { - InitializationInput initializationInput = started.toInitializationInput(); - delegate.initialize(initializationInput); + delegate.initialize(started.toInitializationInput()); } @Override - public void recordsReceived(ProcessRecordsInput records) { - + public void recordsReceived(RecordsReceived records) { + delegate.processRecords(records.toProcessRecordsInput()); } @Override public void leaseLost(LeaseLost leaseLost) { + ShutdownInput shutdownInput = new ShutdownInput() { + @Override + public IRecordProcessorCheckpointer getCheckpointer() { + throw new UnsupportedOperationException("Cannot checkpoint when the lease is lost"); + } + }.withShutdownReason(ShutdownReason.ZOMBIE); + delegate.shutdown(shutdownInput); } @Override - public void shardCompleted(ShardCompleted shardCompletedInput) { - + public void shardCompleted(ShardCompleted shardCompleted) { + ShutdownInput shutdownInput = new ShutdownInput().withCheckpointer(shardCompleted.getCheckpointer()) + .withShutdownReason(ShutdownReason.TERMINATE); + delegate.shutdown(shutdownInput); } @Override public void shutdownRequested(ShutdownRequested shutdownRequested) { - + if (delegate instanceof IShutdownNotificationAware) { + IShutdownNotificationAware aware = (IShutdownNotificationAware)delegate; + aware.shutdownRequested(shutdownRequested.getCheckpointer()); + } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 6fcd2a82..d72e15a4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -15,18 +15,22 @@ package software.amazon.kinesis.lifecycle; +import java.time.Instant; +import java.util.EnumSet; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import lombok.Data; +import lombok.Synchronized; +import org.apache.commons.lang3.StringUtils; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.lifecycle.events.LeaseLost; +import software.amazon.kinesis.lifecycle.events.RecordsReceived; import software.amazon.kinesis.lifecycle.events.ShardCompleted; import software.amazon.kinesis.lifecycle.events.ShutdownRequested; import software.amazon.kinesis.lifecycle.events.Started; @@ -528,41 +532,92 @@ public class ShardConsumer implements RecordProcessorLifecycle { // - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future taskResult = null; + private enum LifecycleStates { + STARTED, PROCESSING, SHUTDOWN, FAILED + } - // - @Override - public void started(Started started) { - if (taskResult != null) { + private EnumSet allowedStates = EnumSet.of(LifecycleStates.STARTED); + private TaskFailedListener listener; + public void addTaskFailedListener(TaskFailedListener listener) { + this.listener = listener; + } + + private TaskFailureHandling taskFailed(Throwable t) { + // + // TODO: What should we do if there is no listener. I intend to require the scheduler to always register + // + if (listener != null) { + return listener.taskFailed(new TaskFailed(t)); + } + return TaskFailureHandling.STOP; + } + + @Data + private class TaskExecution { + private final Instant started; + private final Future future; + } + + ExecutorService executor = Executors.newSingleThreadExecutor(); + TaskExecution taskExecution = null; + + private void awaitAvailable() { + if (taskExecution != null) { try { - taskResult.get(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); + taskExecution.getFuture().get(); + } catch (Throwable t) { + TaskFailureHandling handling = taskFailed(t); + if (handling == TaskFailureHandling.STOP) { + allowedStates = EnumSet.of(LifecycleStates.FAILED); + } } } + } - taskResult = executor.submit(() -> recordProcessorLifecycle.started(started)); + private void checkState(LifecycleStates current) { + if (!allowedStates.contains(current)) { + throw new IllegalStateException("State " + current + " isn't allowed. Allowed: (" + StringUtils.join(allowedStates) + ")"); + } + } + + // + + private void executeTask(LifecycleStates current, Runnable task) { + awaitAvailable(); + checkState(current); + Future future = executor.submit(task); + taskExecution = new TaskExecution(Instant.now(), future); } @Override - public void recordsReceived(ProcessRecordsInput records) { - + @Synchronized + public void started(Started started) { + executeTask(LifecycleStates.STARTED, () -> recordProcessorLifecycle.started(started)); + allowedStates = EnumSet.of(LifecycleStates.PROCESSING, LifecycleStates.SHUTDOWN); } @Override + @Synchronized + public void recordsReceived(RecordsReceived records) { + executeTask(LifecycleStates.PROCESSING, () -> recordProcessorLifecycle.recordsReceived(records)); + } + + @Override + @Synchronized public void leaseLost(LeaseLost leaseLost) { + executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.leaseLost(leaseLost)); + allowedStates = EnumSet.of(LifecycleStates.SHUTDOWN); + } + + @Override + @Synchronized + public void shardCompleted(ShardCompleted shardCompleted) { + executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.shardCompleted(shardCompleted)); } @Override - public void shardCompleted(ShardCompleted shardCompletedInput) { - - } - - @Override + @Synchronized public void shutdownRequested(ShutdownRequested shutdownRequested) { } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java new file mode 100644 index 00000000..c35128ff --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +import lombok.Data; + +@Data +public class TaskFailed { + private final Throwable throwable; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java new file mode 100644 index 00000000..47851fcb --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java @@ -0,0 +1,20 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +@FunctionalInterface +public interface TaskFailedListener { + TaskFailureHandling taskFailed(TaskFailed result); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java new file mode 100644 index 00000000..b5dacac1 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java @@ -0,0 +1,19 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.lifecycle; + +public enum TaskFailureHandling { + STOP, CONTINUE +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java index 15dc0cc6..9d190616 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java @@ -14,9 +14,29 @@ */ package software.amazon.kinesis.lifecycle.events; +import java.time.Duration; +import java.time.Instant; +import java.util.List; + +import com.amazonaws.services.kinesis.model.Record; + import lombok.Data; +import software.amazon.kinesis.lifecycle.ProcessRecordsInput; +import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; @Data public class RecordsReceived { + private final Instant cacheEntryTime; + private final Instant cacheExitTime; + private final boolean isAtShardEnd; + private final List records; + private final IRecordProcessorCheckpointer checkpointer; + private Duration timeBehindLatest; + + public ProcessRecordsInput toProcessRecordsInput() { + return new ProcessRecordsInput(cacheEntryTime, cacheExitTime, isAtShardEnd, records, checkpointer, + timeBehindLatest.toMillis()); + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java index 1df45a56..3d9fdef8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java @@ -14,5 +14,10 @@ */ package software.amazon.kinesis.lifecycle.events; +import lombok.Data; +import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; + +@Data public class ShardCompleted { + private final IRecordProcessorCheckpointer checkpointer; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java index aa9074bd..a4d9eae3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java @@ -14,5 +14,10 @@ */ package software.amazon.kinesis.lifecycle.events; +import lombok.Data; +import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; + +@Data public class ShutdownRequested { + private final IRecordProcessorCheckpointer checkpointer; }