This commit is contained in:
Pfifer, Justin 2018-03-27 05:53:14 -07:00
parent 7b026f8a19
commit f130e4c79c
10 changed files with 194 additions and 29 deletions

View file

@ -83,6 +83,11 @@
<artifactId>commons-lang</artifactId> <artifactId>commons-lang</artifactId>
<version>2.6</version> <version>2.6</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>

View file

@ -25,7 +25,7 @@ public interface RecordProcessorLifecycle {
void started(Started started); void started(Started started);
void recordsReceived(RecordsReceived records); void recordsReceived(RecordsReceived records);
void leaseLost(LeaseLost leaseLost); void leaseLost(LeaseLost leaseLost);
void shardCompleted(ShardCompleted shardCompletedInput); void shardCompleted(ShardCompleted shardCompleted);
void shutdownRequested(ShutdownRequested shutdownRequested); void shutdownRequested(ShutdownRequested shutdownRequested);

View file

@ -16,10 +16,13 @@ package software.amazon.kinesis.lifecycle;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import software.amazon.kinesis.lifecycle.events.LeaseLost; import software.amazon.kinesis.lifecycle.events.LeaseLost;
import software.amazon.kinesis.lifecycle.events.RecordsReceived;
import software.amazon.kinesis.lifecycle.events.ShardCompleted; import software.amazon.kinesis.lifecycle.events.ShardCompleted;
import software.amazon.kinesis.lifecycle.events.ShutdownRequested; import software.amazon.kinesis.lifecycle.events.ShutdownRequested;
import software.amazon.kinesis.lifecycle.events.Started; import software.amazon.kinesis.lifecycle.events.Started;
import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.processor.IRecordProcessor;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.IShutdownNotificationAware;
@AllArgsConstructor @AllArgsConstructor
public class RecordProcessorShim implements RecordProcessorLifecycle { public class RecordProcessorShim implements RecordProcessorLifecycle {
@ -28,27 +31,38 @@ public class RecordProcessorShim implements RecordProcessorLifecycle {
@Override @Override
public void started(Started started) { public void started(Started started) {
InitializationInput initializationInput = started.toInitializationInput(); delegate.initialize(started.toInitializationInput());
delegate.initialize(initializationInput);
} }
@Override @Override
public void recordsReceived(ProcessRecordsInput records) { public void recordsReceived(RecordsReceived records) {
delegate.processRecords(records.toProcessRecordsInput());
} }
@Override @Override
public void leaseLost(LeaseLost leaseLost) { public void leaseLost(LeaseLost leaseLost) {
ShutdownInput shutdownInput = new ShutdownInput() {
@Override
public IRecordProcessorCheckpointer getCheckpointer() {
throw new UnsupportedOperationException("Cannot checkpoint when the lease is lost");
}
}.withShutdownReason(ShutdownReason.ZOMBIE);
delegate.shutdown(shutdownInput);
} }
@Override @Override
public void shardCompleted(ShardCompleted shardCompletedInput) { public void shardCompleted(ShardCompleted shardCompleted) {
ShutdownInput shutdownInput = new ShutdownInput().withCheckpointer(shardCompleted.getCheckpointer())
.withShutdownReason(ShutdownReason.TERMINATE);
delegate.shutdown(shutdownInput);
} }
@Override @Override
public void shutdownRequested(ShutdownRequested shutdownRequested) { public void shutdownRequested(ShutdownRequested shutdownRequested) {
if (delegate instanceof IShutdownNotificationAware) {
IShutdownNotificationAware aware = (IShutdownNotificationAware)delegate;
aware.shutdownRequested(shutdownRequested.getCheckpointer());
}
} }
} }

View file

@ -15,18 +15,22 @@
package software.amazon.kinesis.lifecycle; package software.amazon.kinesis.lifecycle;
import java.time.Instant;
import java.util.EnumSet;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import lombok.Data;
import lombok.Synchronized;
import org.apache.commons.lang3.StringUtils;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.lifecycle.events.LeaseLost; import software.amazon.kinesis.lifecycle.events.LeaseLost;
import software.amazon.kinesis.lifecycle.events.RecordsReceived;
import software.amazon.kinesis.lifecycle.events.ShardCompleted; import software.amazon.kinesis.lifecycle.events.ShardCompleted;
import software.amazon.kinesis.lifecycle.events.ShutdownRequested; import software.amazon.kinesis.lifecycle.events.ShutdownRequested;
import software.amazon.kinesis.lifecycle.events.Started; import software.amazon.kinesis.lifecycle.events.Started;
@ -528,41 +532,92 @@ public class ShardConsumer implements RecordProcessorLifecycle {
//</editor-fold> //</editor-fold>
private enum LifecycleStates {
STARTED, PROCESSING, SHUTDOWN, FAILED
}
private EnumSet<LifecycleStates> allowedStates = EnumSet.of(LifecycleStates.STARTED);
private TaskFailedListener listener;
public void addTaskFailedListener(TaskFailedListener listener) {
this.listener = listener;
}
private TaskFailureHandling taskFailed(Throwable t) {
//
// TODO: What should we do if there is no listener. I intend to require the scheduler to always register
//
if (listener != null) {
return listener.taskFailed(new TaskFailed(t));
}
return TaskFailureHandling.STOP;
}
@Data
private class TaskExecution {
private final Instant started;
private final Future<?> future;
}
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> taskResult = null; TaskExecution taskExecution = null;
private void awaitAvailable() {
if (taskExecution != null) {
try {
taskExecution.getFuture().get();
} catch (Throwable t) {
TaskFailureHandling handling = taskFailed(t);
if (handling == TaskFailureHandling.STOP) {
allowedStates = EnumSet.of(LifecycleStates.FAILED);
}
}
}
}
private void checkState(LifecycleStates current) {
if (!allowedStates.contains(current)) {
throw new IllegalStateException("State " + current + " isn't allowed. Allowed: (" + StringUtils.join(allowedStates) + ")");
}
}
//<editor-fold desc="RecordProcessorLifecycle"> //<editor-fold desc="RecordProcessorLifecycle">
private void executeTask(LifecycleStates current, Runnable task) {
awaitAvailable();
checkState(current);
Future<?> future = executor.submit(task);
taskExecution = new TaskExecution(Instant.now(), future);
}
@Override @Override
@Synchronized
public void started(Started started) { public void started(Started started) {
if (taskResult != null) { executeTask(LifecycleStates.STARTED, () -> recordProcessorLifecycle.started(started));
try { allowedStates = EnumSet.of(LifecycleStates.PROCESSING, LifecycleStates.SHUTDOWN);
taskResult.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
taskResult = executor.submit(() -> recordProcessorLifecycle.started(started));
} }
@Override @Override
public void recordsReceived(ProcessRecordsInput records) { @Synchronized
public void recordsReceived(RecordsReceived records) {
executeTask(LifecycleStates.PROCESSING, () -> recordProcessorLifecycle.recordsReceived(records));
} }
@Override @Override
@Synchronized
public void leaseLost(LeaseLost leaseLost) { public void leaseLost(LeaseLost leaseLost) {
executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.leaseLost(leaseLost));
allowedStates = EnumSet.of(LifecycleStates.SHUTDOWN);
}
@Override
@Synchronized
public void shardCompleted(ShardCompleted shardCompleted) {
executeTask(LifecycleStates.SHUTDOWN, () -> recordProcessorLifecycle.shardCompleted(shardCompleted));
} }
@Override @Override
public void shardCompleted(ShardCompleted shardCompletedInput) { @Synchronized
}
@Override
public void shutdownRequested(ShutdownRequested shutdownRequested) { public void shutdownRequested(ShutdownRequested shutdownRequested) {
} }

View file

@ -0,0 +1,22 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import lombok.Data;
@Data
public class TaskFailed {
private final Throwable throwable;
}

View file

@ -0,0 +1,20 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
@FunctionalInterface
public interface TaskFailedListener {
TaskFailureHandling taskFailed(TaskFailed result);
}

View file

@ -0,0 +1,19 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
public enum TaskFailureHandling {
STOP, CONTINUE
}

View file

@ -14,9 +14,29 @@
*/ */
package software.amazon.kinesis.lifecycle.events; package software.amazon.kinesis.lifecycle.events;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import com.amazonaws.services.kinesis.model.Record;
import lombok.Data; import lombok.Data;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
@Data @Data
public class RecordsReceived { public class RecordsReceived {
private final Instant cacheEntryTime;
private final Instant cacheExitTime;
private final boolean isAtShardEnd;
private final List<Record> records;
private final IRecordProcessorCheckpointer checkpointer;
private Duration timeBehindLatest;
public ProcessRecordsInput toProcessRecordsInput() {
return new ProcessRecordsInput(cacheEntryTime, cacheExitTime, isAtShardEnd, records, checkpointer,
timeBehindLatest.toMillis());
}
} }

View file

@ -14,5 +14,10 @@
*/ */
package software.amazon.kinesis.lifecycle.events; package software.amazon.kinesis.lifecycle.events;
import lombok.Data;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
@Data
public class ShardCompleted { public class ShardCompleted {
private final IRecordProcessorCheckpointer checkpointer;
} }

View file

@ -14,5 +14,10 @@
*/ */
package software.amazon.kinesis.lifecycle.events; package software.amazon.kinesis.lifecycle.events;
import lombok.Data;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
@Data
public class ShutdownRequested { public class ShutdownRequested {
private final IRecordProcessorCheckpointer checkpointer;
} }