From 64469f419988740c6a30a54a1d5e56fddf25094a Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Fri, 10 Apr 2020 05:55:45 -0400 Subject: [PATCH] PR feedback --- .../amazon/kinesis/checkpoint/Checkpoint.java | 5 + .../ShardRecordProcessorCheckpointer.java | 9 ++ .../software/amazon/kinesis/leases/Lease.java | 13 ++- .../kinesis/lifecycle/InitializeTask.java | 1 + .../lifecycle/events/InitializationInput.java | 8 ++ .../kinesis/processor/Checkpointer.java | 13 +++ .../RecordProcessorCheckpointer.java | 105 +++++++++++++++++- 7 files changed, 152 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java index 91f01d0b..f5af81e3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java @@ -28,6 +28,11 @@ public class Checkpoint { private final ExtendedSequenceNumber pendingCheckpoint; private final byte[] pendingCheckpointState; + @Deprecated + public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) { + this(checkpoint, pendingCheckpoint, null); + } + /** * Constructor. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index ec9ff11d..4e831c4b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -140,6 +140,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi this.largestPermittedCheckpointValue.subSequenceNumber()); } + /** + * {@inheritDoc} + */ @Override public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState); @@ -177,6 +180,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi return prepareCheckpoint(sequenceNumber, 0); } + /** + * {@inheritDoc} + */ @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { @@ -192,6 +198,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi return prepareCheckpoint(sequenceNumber, subSequenceNumber, null); } + /** + * {@inheritDoc} + */ @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index a7ed666c..15f79739 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; @NoArgsConstructor @Getter @Accessors(fluent = true) -@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos"}) +@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "pendingCheckpointState"}) @ToString public class Lease { /* @@ -84,6 +84,8 @@ public class Lease { private ExtendedSequenceNumber pendingCheckpoint; /** + * Last pending application state. Deliberately excluded from hashCode and equals. + * * @return pending checkpoint state, possibly null. */ private byte[] pendingCheckpointState; @@ -105,6 +107,15 @@ public class Lease { lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.pendingCheckpointState()); } + @Deprecated + public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, + final UUID concurrencyToken, final Long lastCounterIncrementNanos, + final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, + final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { + this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, + ownerSwitchesSinceCheckpoint, parentShardIds, null); + } + public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, final UUID concurrencyToken, final Long lastCounterIncrementNanos, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index e11eebfa..4108dd9b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -92,6 +92,7 @@ public class InitializeTask implements ConsumerTask { .shardId(shardInfo.shardId()) .extendedSequenceNumber(initialCheckpoint) .pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint()) + .pendingCheckpointState(initialCheckpointObject.pendingCheckpointState()) .build(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java index d6c586aa..3717a805 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java @@ -47,4 +47,12 @@ public class InitializationInput { * completing the checkpoint. */ private final ExtendedSequenceNumber pendingCheckpointSequenceNumber; + + /** + * The last pending application state of the previous record processor. May be null. + * + * This will only be set if the previous record processor had prepared a checkpoint, but lost its lease before + * completing the checkpoint. + */ + private final byte[] pendingCheckpointState; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java index 7f3c947e..2ffadc06 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java @@ -73,6 +73,19 @@ public interface Checkpointer { void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException; + /** + * Record intent to checkpoint for a shard. Upon failover, the pendingCheckpoint and pendingCheckpointState will be + * passed to the new ShardRecordProcessor's initialize() method. + * + * @param leaseKey Checkpoint is specified for this shard. + * @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number) + * @param concurrencyToken Used with conditional writes to prevent stale updates + * (e.g. if there was a fail over to a different record processor, we don't want to + * overwrite it's checkpoint) + * @param pendingCheckpointState Serialized application state at the pending checkpoint. + * + * @throws KinesisClientLibException Thrown if we were unable to save the checkpoint + */ void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java index d42cbb9e..34b2930c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java @@ -144,6 +144,29 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** + * This method will record a pending checkpoint at the last data record that was delivered to the record processor. + * If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next + * IRecordProcessor for this shard will be informed of the prepared sequence number and application state. + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number and application state passed in to init() to behave idempotently. + * + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + */ PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; @@ -176,7 +199,35 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint(Record record) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; - + /** + * This method will record a pending checkpoint at the at the provided record. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the record and application state at which to + * prepare the checkpoint. + * + * @param record A record at which to prepare checkpoint in this shard. + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number and application state passed in to init() to behave idempotently. + * + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; @@ -206,6 +257,31 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + /** + * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number and application state + * at which to checkpoint. + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; @@ -238,6 +314,33 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + /** + * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for + * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()} + * but provides the ability to specify the sequence number, subsequence number, and application state at which to + * checkpoint. + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + * @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard. + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException;