Adding arbitrary application state checkpointing for two phase commit.
This commit is contained in:
parent
b1a3d215d0
commit
af0cd5463d
9 changed files with 110 additions and 18 deletions
|
|
@ -112,6 +112,11 @@ public class StreamingShardRecordProcessorTest {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PreparedCheckpointer prepareCheckpoint(Record record)
|
public PreparedCheckpointer prepareCheckpoint(Record record)
|
||||||
throws KinesisClientLibDependencyException,
|
throws KinesisClientLibDependencyException,
|
||||||
|
|
@ -119,6 +124,11 @@ public class StreamingShardRecordProcessorTest {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber)
|
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber)
|
||||||
throws KinesisClientLibDependencyException,
|
throws KinesisClientLibDependencyException,
|
||||||
|
|
@ -126,6 +136,11 @@ public class StreamingShardRecordProcessorTest {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
|
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
|
||||||
throws KinesisClientLibDependencyException,
|
throws KinesisClientLibDependencyException,
|
||||||
|
|
@ -133,6 +148,11 @@ public class StreamingShardRecordProcessorTest {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Checkpointer checkpointer() {
|
public Checkpointer checkpointer() {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ import lombok.Data;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class encapsulating the 2 pieces of state stored in a checkpoint.
|
* A class encapsulating the 2 pieces of state stored in a checkpoint.
|
||||||
*/
|
*/
|
||||||
|
|
@ -26,18 +28,21 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
public class Checkpoint {
|
public class Checkpoint {
|
||||||
private final ExtendedSequenceNumber checkpoint;
|
private final ExtendedSequenceNumber checkpoint;
|
||||||
private final ExtendedSequenceNumber pendingCheckpoint;
|
private final ExtendedSequenceNumber pendingCheckpoint;
|
||||||
|
private final byte[] pendingCheckpointState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*
|
*
|
||||||
* @param checkpoint the checkpoint sequence number - cannot be null or empty.
|
* @param checkpoint the checkpoint sequence number - cannot be null or empty.
|
||||||
* @param pendingCheckpoint the pending checkpoint sequence number - can be null.
|
* @param pendingCheckpoint the pending checkpoint sequence number - can be null.
|
||||||
|
* @param pendingCheckpointState the pending checkpoint state - can be null.
|
||||||
*/
|
*/
|
||||||
public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) {
|
public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState) {
|
||||||
if (checkpoint == null || checkpoint.sequenceNumber().isEmpty()) {
|
if (checkpoint == null || checkpoint.sequenceNumber().isEmpty()) {
|
||||||
throw new IllegalArgumentException("Checkpoint cannot be null or empty");
|
throw new IllegalArgumentException("Checkpoint cannot be null or empty");
|
||||||
}
|
}
|
||||||
this.checkpoint = checkpoint;
|
this.checkpoint = checkpoint;
|
||||||
this.pendingCheckpoint = pendingCheckpoint;
|
this.pendingCheckpoint = pendingCheckpoint;
|
||||||
|
this.pendingCheckpointState = pendingCheckpointState;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -140,12 +140,13 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
||||||
this.largestPermittedCheckpointValue.subSequenceNumber());
|
this.largestPermittedCheckpointValue.subSequenceNumber());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized PreparedCheckpointer prepareCheckpoint(Record record)
|
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||||
//
|
//
|
||||||
// TODO: UserRecord Deprecation
|
// TODO: UserRecord Deprecation
|
||||||
//
|
//
|
||||||
|
|
@ -154,10 +155,19 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
||||||
} /*else if (record instanceof UserRecord) {
|
} /*else if (record instanceof UserRecord) {
|
||||||
return prepareCheckpoint(record.sequenceNumber(), ((UserRecord) record).subSequenceNumber());
|
return prepareCheckpoint(record.sequenceNumber(), ((UserRecord) record).subSequenceNumber());
|
||||||
} */ else {
|
} */ else {
|
||||||
return prepareCheckpoint(record.sequenceNumber(), 0);
|
return prepareCheckpoint(record.sequenceNumber(), 0, applicationState);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized PreparedCheckpointer prepareCheckpoint(Record record)
|
||||||
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||||
|
return prepareCheckpoint(record, null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
|
|
@ -167,13 +177,24 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
||||||
return prepareCheckpoint(sequenceNumber, 0);
|
return prepareCheckpoint(sequenceNumber, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
|
||||||
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
|
||||||
|
return prepareCheckpoint(sequenceNumber, 0, null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
|
public synchronized PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
|
||||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||||
|
return prepareCheckpoint(sequenceNumber, subSequenceNumber, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
|
||||||
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
|
||||||
if (subSequenceNumber < 0) {
|
if (subSequenceNumber < 0) {
|
||||||
throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number "
|
throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number "
|
||||||
+ subSequenceNumber);
|
+ subSequenceNumber);
|
||||||
|
|
@ -191,7 +212,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
||||||
log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}",
|
log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}",
|
||||||
ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint);
|
ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint);
|
||||||
}
|
}
|
||||||
return doPrepareCheckpoint(pendingCheckpoint);
|
return doPrepareCheckpoint(pendingCheckpoint, applicationState);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException(String.format(
|
throw new IllegalArgumentException(String.format(
|
||||||
"Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable "
|
"Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable "
|
||||||
|
|
@ -290,7 +311,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
||||||
* @throws ThrottlingException
|
* @throws ThrottlingException
|
||||||
* @throws ShutdownException
|
* @throws ShutdownException
|
||||||
*/
|
*/
|
||||||
private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber)
|
private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber, byte[] applicationState)
|
||||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||||
|
|
||||||
ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber;
|
ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber;
|
||||||
|
|
@ -308,7 +329,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken());
|
checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, applicationState,
|
||||||
|
shardInfo.concurrencyToken());
|
||||||
} catch (ThrottlingException | ShutdownException | InvalidStateException
|
} catch (ThrottlingException | ShutdownException | InvalidStateException
|
||||||
| KinesisClientLibDependencyException e) {
|
| KinesisClientLibDependencyException e) {
|
||||||
throw e;
|
throw e;
|
||||||
|
|
|
||||||
|
|
@ -88,7 +88,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
|
||||||
try {
|
try {
|
||||||
Lease lease = leaseRefresher.getLease(leaseKey);
|
Lease lease = leaseRefresher.getLease(leaseKey);
|
||||||
log.debug("[{}] Retrieved lease => {}", leaseKey, lease);
|
log.debug("[{}] Retrieved lease => {}", leaseKey, lease);
|
||||||
return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint());
|
return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint(), lease.pendingCheckpointState());
|
||||||
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
||||||
String message = "Unable to fetch checkpoint for shardId " + leaseKey;
|
String message = "Unable to fetch checkpoint for shardId " + leaseKey;
|
||||||
log.error(message, e);
|
log.error(message, e);
|
||||||
|
|
@ -99,9 +99,14 @@ public class DynamoDBCheckpointer implements Checkpointer {
|
||||||
@Override
|
@Override
|
||||||
public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint,
|
public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint,
|
||||||
final String concurrencyToken) throws KinesisClientLibException {
|
final String concurrencyToken) throws KinesisClientLibException {
|
||||||
|
prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException {
|
||||||
try {
|
try {
|
||||||
boolean wasSuccessful =
|
boolean wasSuccessful =
|
||||||
prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken));
|
prepareCheckpoint(leaseKey, pendingCheckpoint, pendingCheckpointState, UUID.fromString(concurrencyToken));
|
||||||
if (!wasSuccessful) {
|
if (!wasSuccessful) {
|
||||||
throw new ShutdownException(
|
throw new ShutdownException(
|
||||||
"Can't prepare checkpoint - instance doesn't hold the lease for this shard");
|
"Can't prepare checkpoint - instance doesn't hold the lease for this shard");
|
||||||
|
|
@ -134,7 +139,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
|
||||||
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
|
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken)
|
boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, UUID concurrencyToken)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey);
|
Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey);
|
||||||
if (lease == null) {
|
if (lease == null) {
|
||||||
|
|
@ -144,6 +149,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
|
||||||
}
|
}
|
||||||
|
|
||||||
lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null"));
|
lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null"));
|
||||||
|
lease.pendingCheckpointState(pendingCheckpointState);
|
||||||
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
|
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -82,6 +82,12 @@ public class Lease {
|
||||||
* @return pending checkpoint, possibly null.
|
* @return pending checkpoint, possibly null.
|
||||||
*/
|
*/
|
||||||
private ExtendedSequenceNumber pendingCheckpoint;
|
private ExtendedSequenceNumber pendingCheckpoint;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return pending checkpoint state, possibly null.
|
||||||
|
*/
|
||||||
|
private byte[] pendingCheckpointState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return count of distinct lease holders between checkpoints.
|
* @return count of distinct lease holders between checkpoints.
|
||||||
*/
|
*/
|
||||||
|
|
@ -212,6 +218,15 @@ public class Lease {
|
||||||
this.pendingCheckpoint = pendingCheckpoint;
|
this.pendingCheckpoint = pendingCheckpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets pending checkpoint state.
|
||||||
|
*
|
||||||
|
* @param pendingCheckpointState can be null
|
||||||
|
*/
|
||||||
|
public void pendingCheckpointState(byte[] pendingCheckpointState) {
|
||||||
|
this.pendingCheckpointState = pendingCheckpointState;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets ownerSwitchesSinceCheckpoint.
|
* Sets ownerSwitchesSinceCheckpoint.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ public interface Checkpointer {
|
||||||
/**
|
/**
|
||||||
* Get the current checkpoint stored for the specified shard. Useful for checking that the parent shard
|
* Get the current checkpoint stored for the specified shard. Useful for checking that the parent shard
|
||||||
* has been completely processed before we start processing the child shard.
|
* has been completely processed before we start processing the child shard.
|
||||||
*
|
*
|
||||||
* @param leaseKey Current checkpoint for this shard is fetched
|
* @param leaseKey Current checkpoint for this shard is fetched
|
||||||
* @return Current checkpoint for this shard, null if there is no record for this shard.
|
* @return Current checkpoint for this shard, null if there is no record for this shard.
|
||||||
* @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint
|
* @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint
|
||||||
|
|
@ -73,6 +73,9 @@ public interface Checkpointer {
|
||||||
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
|
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
|
||||||
throws KinesisClientLibException;
|
throws KinesisClientLibException;
|
||||||
|
|
||||||
|
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken)
|
||||||
|
throws KinesisClientLibException;
|
||||||
|
|
||||||
void operation(String operation);
|
void operation(String operation);
|
||||||
|
|
||||||
String operation();
|
String operation();
|
||||||
|
|
|
||||||
|
|
@ -93,7 +93,6 @@ public interface RecordProcessorCheckpointer {
|
||||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||||
IllegalArgumentException;
|
IllegalArgumentException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for
|
* This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for
|
||||||
* aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()}
|
* aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()}
|
||||||
|
|
@ -145,6 +144,9 @@ public interface RecordProcessorCheckpointer {
|
||||||
PreparedCheckpointer prepareCheckpoint()
|
PreparedCheckpointer prepareCheckpoint()
|
||||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
|
||||||
|
|
||||||
|
PreparedCheckpointer prepareCheckpoint(byte[] applicationState)
|
||||||
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method will record a pending checkpoint at the at the provided record. This method is analogous to
|
* This method will record a pending checkpoint at the at the provided record. This method is analogous to
|
||||||
* {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint.
|
* {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint.
|
||||||
|
|
@ -174,6 +176,10 @@ public interface RecordProcessorCheckpointer {
|
||||||
PreparedCheckpointer prepareCheckpoint(Record record)
|
PreparedCheckpointer prepareCheckpoint(Record record)
|
||||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
|
||||||
|
|
||||||
|
|
||||||
|
PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState)
|
||||||
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
|
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
|
||||||
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint.
|
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint.
|
||||||
|
|
@ -200,6 +206,10 @@ public interface RecordProcessorCheckpointer {
|
||||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||||
IllegalArgumentException;
|
IllegalArgumentException;
|
||||||
|
|
||||||
|
PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
|
||||||
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||||
|
IllegalArgumentException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
|
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
|
||||||
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
|
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
|
||||||
|
|
@ -228,5 +238,9 @@ public interface RecordProcessorCheckpointer {
|
||||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||||
IllegalArgumentException;
|
IllegalArgumentException;
|
||||||
|
|
||||||
|
PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
|
||||||
|
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||||
|
IllegalArgumentException;
|
||||||
|
|
||||||
Checkpointer checkpointer();
|
Checkpointer checkpointer();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,7 @@ public class InMemoryCheckpointer implements Checkpointer {
|
||||||
private Map<String, ExtendedSequenceNumber> checkpoints = new HashMap<>();
|
private Map<String, ExtendedSequenceNumber> checkpoints = new HashMap<>();
|
||||||
private Map<String, ExtendedSequenceNumber> flushpoints = new HashMap<>();
|
private Map<String, ExtendedSequenceNumber> flushpoints = new HashMap<>();
|
||||||
private Map<String, ExtendedSequenceNumber> pendingCheckpoints = new HashMap<>();
|
private Map<String, ExtendedSequenceNumber> pendingCheckpoints = new HashMap<>();
|
||||||
|
private Map<String, byte[]> pendingCheckpointStates = new HashMap<>();
|
||||||
|
|
||||||
private String operation;
|
private String operation;
|
||||||
|
|
||||||
|
|
@ -64,6 +65,11 @@ public class InMemoryCheckpointer implements Checkpointer {
|
||||||
@Override
|
@Override
|
||||||
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
|
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
|
||||||
throws KinesisClientLibException {
|
throws KinesisClientLibException {
|
||||||
|
prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException {
|
||||||
pendingCheckpoints.put(leaseKey, pendingCheckpoint);
|
pendingCheckpoints.put(leaseKey, pendingCheckpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -71,8 +77,9 @@ public class InMemoryCheckpointer implements Checkpointer {
|
||||||
public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException {
|
public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException {
|
||||||
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
|
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
|
||||||
ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey);
|
ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey);
|
||||||
|
byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey);
|
||||||
|
|
||||||
Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint);
|
Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint, pendingCheckpointState);
|
||||||
log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj);
|
log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj);
|
||||||
return checkpointObj;
|
return checkpointObj;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -242,7 +242,7 @@ public class SchedulerTest {
|
||||||
final List<ShardInfo> secondShardInfo = Collections.singletonList(
|
final List<ShardInfo> secondShardInfo = Collections.singletonList(
|
||||||
new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber));
|
new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber));
|
||||||
|
|
||||||
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null);
|
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null);
|
||||||
|
|
||||||
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
|
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
|
||||||
when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint);
|
when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint);
|
||||||
|
|
@ -368,7 +368,7 @@ public class SchedulerTest {
|
||||||
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber,
|
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber,
|
||||||
sc.streamIdentifier().serialize())).collect(Collectors.toList());
|
sc.streamIdentifier().serialize())).collect(Collectors.toList());
|
||||||
|
|
||||||
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null);
|
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null);
|
||||||
|
|
||||||
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
|
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
|
||||||
when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint);
|
when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue