diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index d1a4f020..536aa87e 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -83,6 +83,11 @@
commons-lang
2.6
+
+ org.apache.commons
+ commons-lang3
+ 3.7
+
org.slf4j
slf4j-api
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java
index db63f88b..bc24b553 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorLifecycle.java
@@ -25,7 +25,7 @@ public interface RecordProcessorLifecycle {
void started(Started started);
void recordsReceived(RecordsReceived records);
void leaseLost(LeaseLost leaseLost);
- void shardCompleted(ShardCompleted shardCompletedInput);
+ void shardCompleted(ShardCompleted shardCompleted);
void shutdownRequested(ShutdownRequested shutdownRequested);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java
index 7d906991..9c55e048 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/RecordProcessorShim.java
@@ -16,10 +16,13 @@ package software.amazon.kinesis.lifecycle;
import lombok.AllArgsConstructor;
import software.amazon.kinesis.lifecycle.events.LeaseLost;
+import software.amazon.kinesis.lifecycle.events.RecordsReceived;
import software.amazon.kinesis.lifecycle.events.ShardCompleted;
import software.amazon.kinesis.lifecycle.events.ShutdownRequested;
import software.amazon.kinesis.lifecycle.events.Started;
import software.amazon.kinesis.processor.IRecordProcessor;
+import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
+import software.amazon.kinesis.processor.IShutdownNotificationAware;
@AllArgsConstructor
public class RecordProcessorShim implements RecordProcessorLifecycle {
@@ -28,27 +31,38 @@ public class RecordProcessorShim implements RecordProcessorLifecycle {
@Override
public void started(Started started) {
- InitializationInput initializationInput = started.toInitializationInput();
- delegate.initialize(initializationInput);
+ delegate.initialize(started.toInitializationInput());
}
@Override
- public void recordsReceived(ProcessRecordsInput records) {
-
+ public void recordsReceived(RecordsReceived records) {
+ delegate.processRecords(records.toProcessRecordsInput());
}
@Override
public void leaseLost(LeaseLost leaseLost) {
+ ShutdownInput shutdownInput = new ShutdownInput() {
+ @Override
+ public IRecordProcessorCheckpointer getCheckpointer() {
+ throw new UnsupportedOperationException("Cannot checkpoint when the lease is lost");
+ }
+ }.withShutdownReason(ShutdownReason.ZOMBIE);
+ delegate.shutdown(shutdownInput);
}
@Override
- public void shardCompleted(ShardCompleted shardCompletedInput) {
-
+ public void shardCompleted(ShardCompleted shardCompleted) {
+ ShutdownInput shutdownInput = new ShutdownInput().withCheckpointer(shardCompleted.getCheckpointer())
+ .withShutdownReason(ShutdownReason.TERMINATE);
+ delegate.shutdown(shutdownInput);
}
@Override
public void shutdownRequested(ShutdownRequested shutdownRequested) {
-
+ if (delegate instanceof IShutdownNotificationAware) {
+ IShutdownNotificationAware aware = (IShutdownNotificationAware)delegate;
+ aware.shutdownRequested(shutdownRequested.getCheckpointer());
+ }
}
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
index 6fcd2a82..d72e15a4 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
@@ -15,18 +15,22 @@
package software.amazon.kinesis.lifecycle;
+import java.time.Instant;
+import java.util.EnumSet;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
+import lombok.Data;
+import lombok.Synchronized;
+import org.apache.commons.lang3.StringUtils;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.lifecycle.events.LeaseLost;
+import software.amazon.kinesis.lifecycle.events.RecordsReceived;
import software.amazon.kinesis.lifecycle.events.ShardCompleted;
import software.amazon.kinesis.lifecycle.events.ShutdownRequested;
import software.amazon.kinesis.lifecycle.events.Started;
@@ -528,41 +532,92 @@ public class ShardConsumer implements RecordProcessorLifecycle {
//
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future> taskResult = null;
+ private enum LifecycleStates {
+ STARTED, PROCESSING, SHUTDOWN, FAILED
+ }
- //
- @Override
- public void started(Started started) {
- if (taskResult != null) {
+ private EnumSet allowedStates = EnumSet.of(LifecycleStates.STARTED);
+ private TaskFailedListener listener;
+ public void addTaskFailedListener(TaskFailedListener listener) {
+ this.listener = listener;
+ }
+
+ private TaskFailureHandling taskFailed(Throwable t) {
+ //
+ // TODO: What should we do if there is no listener. I intend to require the scheduler to always register
+ //
+ if (listener != null) {
+ return listener.taskFailed(new TaskFailed(t));
+ }
+ return TaskFailureHandling.STOP;
+ }
+
+ @Data
+ private class TaskExecution {
+ private final Instant started;
+ private final Future> future;
+ }
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ TaskExecution taskExecution = null;
+
+ private void awaitAvailable() {
+ if (taskExecution != null) {
try {
- taskResult.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
+ taskExecution.getFuture().get();
+ } catch (Throwable t) {
+ TaskFailureHandling handling = taskFailed(t);
+ if (handling == TaskFailureHandling.STOP) {
+ allowedStates = EnumSet.of(LifecycleStates.FAILED);
+ }
}
}
+ }
- taskResult = executor.submit(() -> recordProcessorLifecycle.started(started));
+ private void checkState(LifecycleStates current) {
+ if (!allowedStates.contains(current)) {
+ throw new IllegalStateException("State " + current + " isn't allowed. Allowed: (" + StringUtils.join(allowedStates) + ")");
+ }
+ }
+
+ //
+
+ private void executeTask(LifecycleStates current, Runnable task) {
+ awaitAvailable();
+ checkState(current);
+ Future> future = executor.submit(task);
+ taskExecution = new TaskExecution(Instant.now(), future);
}
@Override
- public void recordsReceived(ProcessRecordsInput records) {
-
+ @Synchronized
+ public void started(Started started) {
+ executeTask(LifecycleStates.STARTED, () -> recordProcessorLifecycle.started(started));
+ allowedStates = EnumSet.of(LifecycleStates.PROCESSING, LifecycleStates.SHUTDOWN);
}
@Override
+ @Synchronized
+ public void recordsReceived(RecordsReceived records) {
+ executeTask(LifecycleStates.PROCESSING, () -> recordProcessorLifecycle.recordsReceived(records));
+ }
+
+ @Override
+ @Synchronized
public void leaseLost(LeaseLost leaseLost) {
+ executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.leaseLost(leaseLost));
+ allowedStates = EnumSet.of(LifecycleStates.SHUTDOWN);
+ }
+
+ @Override
+ @Synchronized
+ public void shardCompleted(ShardCompleted shardCompleted) {
+ executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.shardCompleted(shardCompleted));
}
@Override
- public void shardCompleted(ShardCompleted shardCompletedInput) {
-
- }
-
- @Override
+ @Synchronized
public void shutdownRequested(ShutdownRequested shutdownRequested) {
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java
new file mode 100644
index 00000000..c35128ff
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package software.amazon.kinesis.lifecycle;
+
+import lombok.Data;
+
+@Data
+public class TaskFailed {
+ private final Throwable throwable;
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java
new file mode 100644
index 00000000..47851fcb
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package software.amazon.kinesis.lifecycle;
+
+@FunctionalInterface
+public interface TaskFailedListener {
+ TaskFailureHandling taskFailed(TaskFailed result);
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java
new file mode 100644
index 00000000..b5dacac1
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package software.amazon.kinesis.lifecycle;
+
+public enum TaskFailureHandling {
+ STOP, CONTINUE
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java
index 15dc0cc6..9d190616 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/RecordsReceived.java
@@ -14,9 +14,29 @@
*/
package software.amazon.kinesis.lifecycle.events;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+
+import com.amazonaws.services.kinesis.model.Record;
+
import lombok.Data;
+import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
+import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
@Data
public class RecordsReceived {
+ private final Instant cacheEntryTime;
+ private final Instant cacheExitTime;
+ private final boolean isAtShardEnd;
+ private final List records;
+ private final IRecordProcessorCheckpointer checkpointer;
+ private Duration timeBehindLatest;
+
+ public ProcessRecordsInput toProcessRecordsInput() {
+ return new ProcessRecordsInput(cacheEntryTime, cacheExitTime, isAtShardEnd, records, checkpointer,
+ timeBehindLatest.toMillis());
+ }
+
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java
index 1df45a56..3d9fdef8 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShardCompleted.java
@@ -14,5 +14,10 @@
*/
package software.amazon.kinesis.lifecycle.events;
+import lombok.Data;
+import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
+
+@Data
public class ShardCompleted {
+ private final IRecordProcessorCheckpointer checkpointer;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java
index aa9074bd..a4d9eae3 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ShutdownRequested.java
@@ -14,5 +14,10 @@
*/
package software.amazon.kinesis.lifecycle.events;
+import lombok.Data;
+import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
+
+@Data
public class ShutdownRequested {
+ private final IRecordProcessorCheckpointer checkpointer;
}