Merge pull request #11 from ashwing/TwoPhaseCommit

Introducing two phase commit with arbitrary application state
This commit is contained in:
Joshua Kim 2020-04-20 16:08:44 -04:00 committed by GitHub
commit 78a877dc0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 399 additions and 27 deletions

View file

@ -112,6 +112,11 @@ public class StreamingShardRecordProcessorTest {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
throw new UnsupportedOperationException();
}
@Override @Override
public PreparedCheckpointer prepareCheckpoint(Record record) public PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, throws KinesisClientLibDependencyException,
@ -119,6 +124,11 @@ public class StreamingShardRecordProcessorTest {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
throw new UnsupportedOperationException();
}
@Override @Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber) public PreparedCheckpointer prepareCheckpoint(String sequenceNumber)
throws KinesisClientLibDependencyException, throws KinesisClientLibDependencyException,
@ -126,6 +136,11 @@ public class StreamingShardRecordProcessorTest {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
return null;
}
@Override @Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException, throws KinesisClientLibDependencyException,
@ -133,6 +148,11 @@ public class StreamingShardRecordProcessorTest {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
throw new UnsupportedOperationException();
}
@Override @Override
public Checkpointer checkpointer() { public Checkpointer checkpointer() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View file

@ -26,18 +26,26 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public class Checkpoint { public class Checkpoint {
private final ExtendedSequenceNumber checkpoint; private final ExtendedSequenceNumber checkpoint;
private final ExtendedSequenceNumber pendingCheckpoint; private final ExtendedSequenceNumber pendingCheckpoint;
private final byte[] pendingCheckpointState;
@Deprecated
public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) {
this(checkpoint, pendingCheckpoint, null);
}
/** /**
* Constructor. * Constructor.
* *
* @param checkpoint the checkpoint sequence number - cannot be null or empty. * @param checkpoint the checkpoint sequence number - cannot be null or empty.
* @param pendingCheckpoint the pending checkpoint sequence number - can be null. * @param pendingCheckpoint the pending checkpoint sequence number - can be null.
* @param pendingCheckpointState the pending checkpoint state - can be null.
*/ */
public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) { public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState) {
if (checkpoint == null || checkpoint.sequenceNumber().isEmpty()) { if (checkpoint == null || checkpoint.sequenceNumber().isEmpty()) {
throw new IllegalArgumentException("Checkpoint cannot be null or empty"); throw new IllegalArgumentException("Checkpoint cannot be null or empty");
} }
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.pendingCheckpoint = pendingCheckpoint; this.pendingCheckpoint = pendingCheckpoint;
this.pendingCheckpointState = pendingCheckpointState;
} }
} }

View file

@ -144,8 +144,15 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public synchronized PreparedCheckpointer prepareCheckpoint(Record record) public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState);
}
/**
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
// //
// TODO: UserRecord Deprecation // TODO: UserRecord Deprecation
// //
@ -154,10 +161,19 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
} /*else if (record instanceof UserRecord) { } /*else if (record instanceof UserRecord) {
return prepareCheckpoint(record.sequenceNumber(), ((UserRecord) record).subSequenceNumber()); return prepareCheckpoint(record.sequenceNumber(), ((UserRecord) record).subSequenceNumber());
} */ else { } */ else {
return prepareCheckpoint(record.sequenceNumber(), 0); return prepareCheckpoint(record.sequenceNumber(), 0, applicationState);
} }
} }
/**
* {@inheritDoc}
*/
@Override
public synchronized PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(record, null);
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@ -167,13 +183,30 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
return prepareCheckpoint(sequenceNumber, 0); return prepareCheckpoint(sequenceNumber, 0);
} }
/**
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
return prepareCheckpoint(sequenceNumber, 0, applicationState);
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public synchronized PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) public synchronized PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(sequenceNumber, subSequenceNumber, null);
}
/**
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
if (subSequenceNumber < 0) { if (subSequenceNumber < 0) {
throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number " throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number "
+ subSequenceNumber); + subSequenceNumber);
@ -191,7 +224,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}", log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}",
ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint); ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint);
} }
return doPrepareCheckpoint(pendingCheckpoint); return doPrepareCheckpoint(pendingCheckpoint, applicationState);
} else { } else {
throw new IllegalArgumentException(String.format( throw new IllegalArgumentException(String.format(
"Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable " "Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable "
@ -290,7 +323,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* @throws ThrottlingException * @throws ThrottlingException
* @throws ShutdownException * @throws ShutdownException
*/ */
private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber) private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber; ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber;
@ -308,7 +341,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
} }
try { try {
checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken()); checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken(), applicationState);
} catch (ThrottlingException | ShutdownException | InvalidStateException } catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) { | KinesisClientLibDependencyException e) {
throw e; throw e;

View file

@ -88,7 +88,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
try { try {
Lease lease = leaseRefresher.getLease(leaseKey); Lease lease = leaseRefresher.getLease(leaseKey);
log.debug("[{}] Retrieved lease => {}", leaseKey, lease); log.debug("[{}] Retrieved lease => {}", leaseKey, lease);
return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint()); return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint(), lease.pendingCheckpointState());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
String message = "Unable to fetch checkpoint for shardId " + leaseKey; String message = "Unable to fetch checkpoint for shardId " + leaseKey;
log.error(message, e); log.error(message, e);
@ -99,9 +99,14 @@ public class DynamoDBCheckpointer implements Checkpointer {
@Override @Override
public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint, public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint,
final String concurrencyToken) throws KinesisClientLibException { final String concurrencyToken) throws KinesisClientLibException {
prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null);
}
@Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException {
try { try {
boolean wasSuccessful = boolean wasSuccessful =
prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken)); prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState);
if (!wasSuccessful) { if (!wasSuccessful) {
throw new ShutdownException( throw new ShutdownException(
"Can't prepare checkpoint - instance doesn't hold the lease for this shard"); "Can't prepare checkpoint - instance doesn't hold the lease for this shard");
@ -129,12 +134,13 @@ public class DynamoDBCheckpointer implements Checkpointer {
lease.checkpoint(checkpoint); lease.checkpoint(checkpoint);
lease.pendingCheckpoint(null); lease.pendingCheckpoint(null);
lease.pendingCheckpointState(null);
lease.ownerSwitchesSinceCheckpoint(0L); lease.ownerSwitchesSinceCheckpoint(0L);
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
} }
boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken) boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken, byte[] pendingCheckpointState)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey);
if (lease == null) { if (lease == null) {
@ -144,6 +150,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
} }
lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null")); lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null"));
lease.pendingCheckpointState(pendingCheckpointState);
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
} }

View file

@ -14,6 +14,7 @@
*/ */
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -36,6 +37,14 @@ public class DynamoUtils {
return AttributeValue.builder().ss(collectionValue).build(); return AttributeValue.builder().ss(collectionValue).build();
} }
public static AttributeValue createAttributeValue(byte[] byteBufferValue) {
if (byteBufferValue == null) {
throw new IllegalArgumentException("Byte buffer attributeValues cannot be null or empty.");
}
return AttributeValue.builder().b(SdkBytes.fromByteArray(byteBufferValue)).build();
}
public static AttributeValue createAttributeValue(String stringValue) { public static AttributeValue createAttributeValue(String stringValue) {
if (stringValue == null || stringValue.isEmpty()) { if (stringValue == null || stringValue.isEmpty()) {
throw new IllegalArgumentException("String attributeValues cannot be null or empty."); throw new IllegalArgumentException("String attributeValues cannot be null or empty.");
@ -52,6 +61,15 @@ public class DynamoUtils {
return AttributeValue.builder().n(longValue.toString()).build(); return AttributeValue.builder().n(longValue.toString()).build();
} }
public static byte[] safeGetByteArray(Map<String, AttributeValue> dynamoRecord, String key) {
AttributeValue av = dynamoRecord.get(key);
if (av == null) {
return null;
} else {
return av.b().asByteArray();
}
}
public static Long safeGetLong(Map<String, AttributeValue> dynamoRecord, String key) { public static Long safeGetLong(Map<String, AttributeValue> dynamoRecord, String key) {
AttributeValue av = dynamoRecord.get(key); AttributeValue av = dynamoRecord.get(key);
if (av == null) { if (av == null) {

View file

@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
@NoArgsConstructor @NoArgsConstructor
@Getter @Getter
@Accessors(fluent = true) @Accessors(fluent = true)
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos"}) @EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "pendingCheckpointState"})
@ToString @ToString
public class Lease { public class Lease {
/* /*
@ -82,6 +82,14 @@ public class Lease {
* @return pending checkpoint, possibly null. * @return pending checkpoint, possibly null.
*/ */
private ExtendedSequenceNumber pendingCheckpoint; private ExtendedSequenceNumber pendingCheckpoint;
/**
* Last pending application state. Deliberately excluded from hashCode and equals.
*
* @return pending checkpoint state, possibly null.
*/
private byte[] pendingCheckpointState;
/** /**
* @return count of distinct lease holders between checkpoints. * @return count of distinct lease holders between checkpoints.
*/ */
@ -96,13 +104,22 @@ public class Lease {
protected Lease(Lease lease) { protected Lease(Lease lease) {
this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(),
lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(),
lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds()); lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.pendingCheckpointState());
}
@Deprecated
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) {
this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint,
ownerSwitchesSinceCheckpoint, parentShardIds, null);
} }
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos, final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) { final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final byte[] pendingCheckpointState) {
this.leaseKey = leaseKey; this.leaseKey = leaseKey;
this.leaseOwner = leaseOwner; this.leaseOwner = leaseOwner;
this.leaseCounter = leaseCounter; this.leaseCounter = leaseCounter;
@ -114,6 +131,7 @@ public class Lease {
if (parentShardIds != null) { if (parentShardIds != null) {
this.parentShardIds.addAll(parentShardIds); this.parentShardIds.addAll(parentShardIds);
} }
this.pendingCheckpointState = pendingCheckpointState;
} }
/** /**
@ -133,6 +151,7 @@ public class Lease {
ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint()); ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint());
checkpoint(lease.checkpoint); checkpoint(lease.checkpoint);
pendingCheckpoint(lease.pendingCheckpoint); pendingCheckpoint(lease.pendingCheckpoint);
pendingCheckpointState(lease.pendingCheckpointState);
parentShardIds(lease.parentShardIds); parentShardIds(lease.parentShardIds);
} }
@ -212,6 +231,15 @@ public class Lease {
this.pendingCheckpoint = pendingCheckpoint; this.pendingCheckpoint = pendingCheckpoint;
} }
/**
* Sets pending checkpoint state.
*
* @param pendingCheckpointState can be null
*/
public void pendingCheckpointState(byte[] pendingCheckpointState) {
this.pendingCheckpointState = pendingCheckpointState;
}
/** /**
* Sets ownerSwitchesSinceCheckpoint. * Sets ownerSwitchesSinceCheckpoint.
* *

View file

@ -50,6 +50,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber"; private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint"; private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint";
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber"; private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState";
private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String PARENT_SHARD_ID_KEY = "parentShardId";
@Override @Override
@ -75,6 +76,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber())); result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber()));
} }
if (lease.pendingCheckpointState() != null) {
result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber()));
}
return result; return result;
} }
@ -105,6 +110,9 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY)) DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY))
); );
} }
leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY));
return leaseToUpdate; return leaseToUpdate;
} }
@ -220,6 +228,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
} }
if (lease.pendingCheckpointState() != null) {
result.put(PENDING_CHECKPOINT_STATE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpointState())));
} else {
result.put(PENDING_CHECKPOINT_STATE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
}
return result; return result;
} }

View file

@ -92,6 +92,7 @@ public class InitializeTask implements ConsumerTask {
.shardId(shardInfo.shardId()) .shardId(shardInfo.shardId())
.extendedSequenceNumber(initialCheckpoint) .extendedSequenceNumber(initialCheckpoint)
.pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint()) .pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint())
.pendingCheckpointState(initialCheckpointObject.pendingCheckpointState())
.build(); .build();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory,

View file

@ -47,4 +47,12 @@ public class InitializationInput {
* completing the checkpoint. * completing the checkpoint.
*/ */
private final ExtendedSequenceNumber pendingCheckpointSequenceNumber; private final ExtendedSequenceNumber pendingCheckpointSequenceNumber;
/**
* The last pending application state of the previous record processor. May be null.
*
* This will only be set if the previous record processor had prepared a checkpoint, but lost its lease before
* completing the checkpoint.
*/
private final byte[] pendingCheckpointState;
} }

View file

@ -73,6 +73,22 @@ public interface Checkpointer {
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException; throws KinesisClientLibException;
/**
* Record intent to checkpoint for a shard. Upon failover, the pendingCheckpoint and pendingCheckpointState will be
* passed to the new ShardRecordProcessor's initialize() method.
*
* @param leaseKey Checkpoint is specified for this shard.
* @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number)
* @param concurrencyToken Used with conditional writes to prevent stale updates
* (e.g. if there was a fail over to a different record processor, we don't want to
* overwrite it's checkpoint)
* @param pendingCheckpointState Serialized application state at the pending checkpoint.
*
* @throws KinesisClientLibException Thrown if we were unable to save the checkpoint
*/
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState)
throws KinesisClientLibException;
void operation(String operation); void operation(String operation);
String operation(); String operation();

View file

@ -93,7 +93,6 @@ public interface RecordProcessorCheckpointer {
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException; IllegalArgumentException;
/** /**
* This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for * This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()} * aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()}
@ -145,6 +144,32 @@ public interface RecordProcessorCheckpointer {
PreparedCheckpointer prepareCheckpoint() PreparedCheckpointer prepareCheckpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the last data record that was delivered to the record processor.
* If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next
* IRecordProcessor for this shard will be informed of the prepared sequence number and application state.
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number and application state passed in to init() to behave idempotently.
*
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
*/
PreparedCheckpointer prepareCheckpoint(byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/** /**
* This method will record a pending checkpoint at the at the provided record. This method is analogous to * This method will record a pending checkpoint at the at the provided record. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint. * {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint.
@ -174,6 +199,38 @@ public interface RecordProcessorCheckpointer {
PreparedCheckpointer prepareCheckpoint(Record record) PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the at the provided record. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the record and application state at which to
* prepare the checkpoint.
*
* @param record A record at which to prepare checkpoint in this shard.
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number and application state passed in to init() to behave idempotently.
*
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/** /**
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint. * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint.
@ -200,6 +257,35 @@ public interface RecordProcessorCheckpointer {
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException; IllegalArgumentException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number and application state
* at which to checkpoint.
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
*
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
/** /**
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()} * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
@ -228,5 +314,36 @@ public interface RecordProcessorCheckpointer {
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException; IllegalArgumentException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
* but provides the ability to specify the sequence number, subsequence number, and application state at which to
* checkpoint.
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard.
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
*
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
Checkpointer checkpointer(); Checkpointer checkpointer();
} }

View file

@ -90,6 +90,26 @@ public class CheckpointerTest {
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
} }
@Test
public final void testInitialPrepareCheckpointWithApplicationState() throws Exception {
String sequenceNumber = "1";
String pendingCheckpointValue = "99999";
String shardId = "myShardId";
byte[] applicationState = "applicationState".getBytes();
ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(sequenceNumber);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken);
ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue);
checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), testConcurrencyToken,
applicationState);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState());
}
@Test @Test
public final void testAdvancingPrepareCheckpoint() throws Exception { public final void testAdvancingPrepareCheckpoint() throws Exception {
String shardId = "myShardId"; String shardId = "myShardId";
@ -107,6 +127,26 @@ public class CheckpointerTest {
} }
} }
@Test
public final void testAdvancingPrepareCheckpointWithApplicationState() throws Exception {
String shardId = "myShardId";
String checkpointValue = "12345";
byte[] applicationState = "applicationState".getBytes();
ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), testConcurrencyToken);
for (Integer i = 0; i < 10; i++) {
String sequenceNumber = i.toString();
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber);
checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken,
applicationState);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState());
}
}
@Test @Test
public final void testPrepareAndSetCheckpoint() throws Exception { public final void testPrepareAndSetCheckpoint() throws Exception {
String checkpointValue = "12345"; String checkpointValue = "12345";
@ -134,4 +174,35 @@ public class CheckpointerTest {
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
} }
@Test
public final void testPrepareAndSetCheckpointWithApplicationState() throws Exception {
String checkpointValue = "12345";
String shardId = "testShardId-1";
String concurrencyToken = "token-1";
String pendingCheckpointValue = "99999";
byte[] applicationState = "applicationState".getBytes();
// set initial checkpoint
ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), concurrencyToken);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
// prepare checkpoint
ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue);
checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken, applicationState);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState());
// do checkpoint
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken);
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpointState());
}
} }

View file

@ -18,7 +18,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -32,6 +31,7 @@ public class InMemoryCheckpointer implements Checkpointer {
private Map<String, ExtendedSequenceNumber> checkpoints = new HashMap<>(); private Map<String, ExtendedSequenceNumber> checkpoints = new HashMap<>();
private Map<String, ExtendedSequenceNumber> flushpoints = new HashMap<>(); private Map<String, ExtendedSequenceNumber> flushpoints = new HashMap<>();
private Map<String, ExtendedSequenceNumber> pendingCheckpoints = new HashMap<>(); private Map<String, ExtendedSequenceNumber> pendingCheckpoints = new HashMap<>();
private Map<String, byte[]> pendingCheckpointStates = new HashMap<>();
private String operation; private String operation;
@ -44,6 +44,7 @@ public class InMemoryCheckpointer implements Checkpointer {
checkpoints.put(leaseKey, checkpointValue); checkpoints.put(leaseKey, checkpointValue);
flushpoints.put(leaseKey, checkpointValue); flushpoints.put(leaseKey, checkpointValue);
pendingCheckpoints.remove(leaseKey); pendingCheckpoints.remove(leaseKey);
pendingCheckpointStates.remove(leaseKey);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue); log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue);
@ -64,15 +65,22 @@ public class InMemoryCheckpointer implements Checkpointer {
@Override @Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException { throws KinesisClientLibException {
prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null);
}
@Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException {
pendingCheckpoints.put(leaseKey, pendingCheckpoint); pendingCheckpoints.put(leaseKey, pendingCheckpoint);
pendingCheckpointStates.put(leaseKey, pendingCheckpointState);
} }
@Override @Override
public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException { public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException {
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey); ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey);
byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey);
Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint); Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint, pendingCheckpointState);
log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj); log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj);
return checkpointObj; return checkpointObj;
} }

View file

@ -242,7 +242,7 @@ public class SchedulerTest {
final List<ShardInfo> secondShardInfo = Collections.singletonList( final List<ShardInfo> secondShardInfo = Collections.singletonList(
new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber)); new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber));
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null); final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null);
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint); when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint);
@ -368,7 +368,7 @@ public class SchedulerTest {
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber, .map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber,
sc.streamIdentifier().serialize())).collect(Collectors.toList()); sc.streamIdentifier().serialize())).collect(Collectors.toList());
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null); final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null);
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint); when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint);

View file

@ -969,7 +969,7 @@ public class HierarchicalShardSyncerTest {
parentShardIds.add(shard.adjacentParentShardId()); parentShardIds.add(shard.adjacentParentShardId());
} }
return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L, return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L,
parentShardIds); parentShardIds, null);
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }

View file

@ -34,9 +34,10 @@ public class LeaseBuilder {
private ExtendedSequenceNumber pendingCheckpoint; private ExtendedSequenceNumber pendingCheckpoint;
private Long ownerSwitchesSinceCheckpoint = 0L; private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>(); private Set<String> parentShardIds = new HashSet<>();
private byte[] pendingCheckpointState;
public Lease build() { public Lease build() {
return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds); checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, pendingCheckpointState);
} }
} }

View file

@ -127,16 +127,37 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
} }
assertNotNull(lease); assertNotNull(lease);
ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber("newCheckpoint"); final ExtendedSequenceNumber initialCheckpoint = new ExtendedSequenceNumber("initialCheckpoint");
final ExtendedSequenceNumber pendingCheckpoint = new ExtendedSequenceNumber("pendingCheckpoint");
final ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber("newCheckpoint");
final byte[] checkpointState = "checkpointState".getBytes();
// lease's leaseCounter is wrong at this point, but it shouldn't matter. // lease's leaseCounter is wrong at this point, but it shouldn't matter.
assertTrue(dynamoDBCheckpointer.setCheckpoint(lease.leaseKey(), initialCheckpoint, lease.concurrencyToken()));
final Lease leaseFromDDBAtInitialCheckpoint = leaseRefresher.getLease(lease.leaseKey());
lease.leaseCounter(lease.leaseCounter() + 1);
lease.checkpoint(initialCheckpoint);
lease.leaseOwner(coordinator.workerIdentifier());
assertEquals(lease, leaseFromDDBAtInitialCheckpoint);
dynamoDBCheckpointer.prepareCheckpoint(lease.leaseKey(), pendingCheckpoint, lease.concurrencyToken().toString(), checkpointState);
final Lease leaseFromDDBAtPendingCheckpoint = leaseRefresher.getLease(lease.leaseKey());
lease.leaseCounter(lease.leaseCounter() + 1);
lease.checkpoint(initialCheckpoint);
lease.pendingCheckpoint(pendingCheckpoint);
lease.pendingCheckpointState(checkpointState);
assertEquals(lease, leaseFromDDBAtPendingCheckpoint);
assertTrue(dynamoDBCheckpointer.setCheckpoint(lease.leaseKey(), newCheckpoint, lease.concurrencyToken())); assertTrue(dynamoDBCheckpointer.setCheckpoint(lease.leaseKey(), newCheckpoint, lease.concurrencyToken()));
Lease fromDynamo = leaseRefresher.getLease(lease.leaseKey()); final Lease leaseFromDDBAtNewCheckpoint = leaseRefresher.getLease(lease.leaseKey());
lease.leaseCounter(lease.leaseCounter() + 1); lease.leaseCounter(lease.leaseCounter() + 1);
lease.checkpoint(newCheckpoint); lease.checkpoint(newCheckpoint);
lease.leaseOwner(coordinator.workerIdentifier()); lease.pendingCheckpoint(null);
assertEquals(lease, fromDynamo); lease.pendingCheckpointState(null);
assertEquals(lease, leaseFromDDBAtNewCheckpoint);
} }
/** /**

View file

@ -55,7 +55,7 @@ public class DynamoDBLeaseRenewerTest {
private LeaseRefresher leaseRefresher; private LeaseRefresher leaseRefresher;
private static Lease newLease(String leaseKey) { private static Lease newLease(String leaseKey) {
return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>()); return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), null);
} }
@Before @Before