PR feedback

This commit is contained in:
Joshua Kim 2020-04-10 05:55:45 -04:00
parent 80df8ce439
commit 64469f4199
7 changed files with 152 additions and 2 deletions

View file

@ -28,6 +28,11 @@ public class Checkpoint {
private final ExtendedSequenceNumber pendingCheckpoint; private final ExtendedSequenceNumber pendingCheckpoint;
private final byte[] pendingCheckpointState; private final byte[] pendingCheckpointState;
@Deprecated
public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) {
this(checkpoint, pendingCheckpoint, null);
}
/** /**
* Constructor. * Constructor.
* *

View file

@ -140,6 +140,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
this.largestPermittedCheckpointValue.subSequenceNumber()); this.largestPermittedCheckpointValue.subSequenceNumber());
} }
/**
* {@inheritDoc}
*/
@Override @Override
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState); return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState);
@ -177,6 +180,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
return prepareCheckpoint(sequenceNumber, 0); return prepareCheckpoint(sequenceNumber, 0);
} }
/**
* {@inheritDoc}
*/
@Override @Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
@ -192,6 +198,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
return prepareCheckpoint(sequenceNumber, subSequenceNumber, null); return prepareCheckpoint(sequenceNumber, subSequenceNumber, null);
} }
/**
* {@inheritDoc}
*/
@Override @Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {

View file

@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
@NoArgsConstructor @NoArgsConstructor
@Getter @Getter
@Accessors(fluent = true) @Accessors(fluent = true)
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos"}) @EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "pendingCheckpointState"})
@ToString @ToString
public class Lease { public class Lease {
/* /*
@ -84,6 +84,8 @@ public class Lease {
private ExtendedSequenceNumber pendingCheckpoint; private ExtendedSequenceNumber pendingCheckpoint;
/** /**
* Last pending application state. Deliberately excluded from hashCode and equals.
*
* @return pending checkpoint state, possibly null. * @return pending checkpoint state, possibly null.
*/ */
private byte[] pendingCheckpointState; private byte[] pendingCheckpointState;
@ -105,6 +107,15 @@ public class Lease {
lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.pendingCheckpointState()); lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.pendingCheckpointState());
} }
@Deprecated
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) {
this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint,
ownerSwitchesSinceCheckpoint, parentShardIds, null);
}
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos, final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,

View file

@ -92,6 +92,7 @@ public class InitializeTask implements ConsumerTask {
.shardId(shardInfo.shardId()) .shardId(shardInfo.shardId())
.extendedSequenceNumber(initialCheckpoint) .extendedSequenceNumber(initialCheckpoint)
.pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint()) .pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint())
.pendingCheckpointState(initialCheckpointObject.pendingCheckpointState())
.build(); .build();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory,

View file

@ -47,4 +47,12 @@ public class InitializationInput {
* completing the checkpoint. * completing the checkpoint.
*/ */
private final ExtendedSequenceNumber pendingCheckpointSequenceNumber; private final ExtendedSequenceNumber pendingCheckpointSequenceNumber;
/**
* The last pending application state of the previous record processor. May be null.
*
* This will only be set if the previous record processor had prepared a checkpoint, but lost its lease before
* completing the checkpoint.
*/
private final byte[] pendingCheckpointState;
} }

View file

@ -73,6 +73,19 @@ public interface Checkpointer {
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException; throws KinesisClientLibException;
/**
* Record intent to checkpoint for a shard. Upon failover, the pendingCheckpoint and pendingCheckpointState will be
* passed to the new ShardRecordProcessor's initialize() method.
*
* @param leaseKey Checkpoint is specified for this shard.
* @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number)
* @param concurrencyToken Used with conditional writes to prevent stale updates
* (e.g. if there was a fail over to a different record processor, we don't want to
* overwrite it's checkpoint)
* @param pendingCheckpointState Serialized application state at the pending checkpoint.
*
* @throws KinesisClientLibException Thrown if we were unable to save the checkpoint
*/
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState)
throws KinesisClientLibException; throws KinesisClientLibException;

View file

@ -144,6 +144,29 @@ public interface RecordProcessorCheckpointer {
PreparedCheckpointer prepareCheckpoint() PreparedCheckpointer prepareCheckpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the last data record that was delivered to the record processor.
* If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next
* IRecordProcessor for this shard will be informed of the prepared sequence number and application state.
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number and application state passed in to init() to behave idempotently.
*
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
*/
PreparedCheckpointer prepareCheckpoint(byte[] applicationState) PreparedCheckpointer prepareCheckpoint(byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
@ -176,7 +199,35 @@ public interface RecordProcessorCheckpointer {
PreparedCheckpointer prepareCheckpoint(Record record) PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the at the provided record. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the record and application state at which to
* prepare the checkpoint.
*
* @param record A record at which to prepare checkpoint in this shard.
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number and application state passed in to init() to behave idempotently.
*
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
@ -206,6 +257,31 @@ public interface RecordProcessorCheckpointer {
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException; IllegalArgumentException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number and application state
* at which to checkpoint.
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
*
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException; IllegalArgumentException;
@ -238,6 +314,33 @@ public interface RecordProcessorCheckpointer {
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException; IllegalArgumentException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
* but provides the ability to specify the sequence number, subsequence number, and application state at which to
* checkpoint.
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard.
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
*
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException; IllegalArgumentException;