From af0cd5463df82e85a6de9031ddfd4be82a27717b Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Mon, 30 Mar 2020 02:35:13 -0400 Subject: [PATCH] Adding arbitrary application state checkpointing for two phase commit. --- .../StreamingShardRecordProcessorTest.java | 20 ++++++++++ .../amazon/kinesis/checkpoint/Checkpoint.java | 7 +++- .../ShardRecordProcessorCheckpointer.java | 40 ++++++++++++++----- .../dynamodb/DynamoDBCheckpointer.java | 12 ++++-- .../software/amazon/kinesis/leases/Lease.java | 15 +++++++ .../kinesis/processor/Checkpointer.java | 5 ++- .../RecordProcessorCheckpointer.java | 16 +++++++- .../checkpoint/InMemoryCheckpointer.java | 9 ++++- .../kinesis/coordinator/SchedulerTest.java | 4 +- 9 files changed, 110 insertions(+), 18 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java index da7e9fb2..e3368e07 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java @@ -112,6 +112,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + throw new UnsupportedOperationException(); + } + @Override public PreparedCheckpointer prepareCheckpoint(Record record) throws KinesisClientLibDependencyException, @@ -119,6 +124,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + throw new UnsupportedOperationException(); + } + @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber) throws KinesisClientLibDependencyException, @@ -126,6 +136,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + return null; + } + @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) throws KinesisClientLibDependencyException, @@ -133,6 +148,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + throw new UnsupportedOperationException(); + } + @Override public Checkpointer checkpointer() { throw new UnsupportedOperationException(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java index 2bab0cd6..7b7bea9e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java @@ -18,6 +18,8 @@ import lombok.Data; import lombok.experimental.Accessors; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.Arrays; + /** * A class encapsulating the 2 pieces of state stored in a checkpoint. */ @@ -26,18 +28,21 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class Checkpoint { private final ExtendedSequenceNumber checkpoint; private final ExtendedSequenceNumber pendingCheckpoint; + private final byte[] pendingCheckpointState; /** * Constructor. * * @param checkpoint the checkpoint sequence number - cannot be null or empty. * @param pendingCheckpoint the pending checkpoint sequence number - can be null. + * @param pendingCheckpointState the pending checkpoint state - can be null. */ - public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) { + public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState) { if (checkpoint == null || checkpoint.sequenceNumber().isEmpty()) { throw new IllegalArgumentException("Checkpoint cannot be null or empty"); } this.checkpoint = checkpoint; this.pendingCheckpoint = pendingCheckpoint; + this.pendingCheckpointState = pendingCheckpointState; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index 7d504bbb..0f823915 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -140,12 +140,13 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi this.largestPermittedCheckpointValue.subSequenceNumber()); } - /** - * {@inheritDoc} - */ @Override - public synchronized PreparedCheckpointer prepareCheckpoint(Record record) - throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState); + } + + @Override + public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { // // TODO: UserRecord Deprecation // @@ -154,10 +155,19 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } /*else if (record instanceof UserRecord) { return prepareCheckpoint(record.sequenceNumber(), ((UserRecord) record).subSequenceNumber()); } */ else { - return prepareCheckpoint(record.sequenceNumber(), 0); + return prepareCheckpoint(record.sequenceNumber(), 0, applicationState); } } + /** + * {@inheritDoc} + */ + @Override + public synchronized PreparedCheckpointer prepareCheckpoint(Record record) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + return prepareCheckpoint(record, null); + } + /** * {@inheritDoc} */ @@ -167,13 +177,24 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi return prepareCheckpoint(sequenceNumber, 0); } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + return prepareCheckpoint(sequenceNumber, 0, null); + } + /** * {@inheritDoc} */ @Override public synchronized PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + return prepareCheckpoint(sequenceNumber, subSequenceNumber, null); + } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { if (subSequenceNumber < 0) { throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number " + subSequenceNumber); @@ -191,7 +212,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}", ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint); } - return doPrepareCheckpoint(pendingCheckpoint); + return doPrepareCheckpoint(pendingCheckpoint, applicationState); } else { throw new IllegalArgumentException(String.format( "Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable " @@ -290,7 +311,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi * @throws ThrottlingException * @throws ShutdownException */ - private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber) + private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber; @@ -308,7 +329,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } try { - checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken()); + checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, applicationState, + shardInfo.concurrencyToken()); } catch (ThrottlingException | ShutdownException | InvalidStateException | KinesisClientLibDependencyException e) { throw e; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index fb7a6fc7..71969f65 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -88,7 +88,7 @@ public class DynamoDBCheckpointer implements Checkpointer { try { Lease lease = leaseRefresher.getLease(leaseKey); log.debug("[{}] Retrieved lease => {}", leaseKey, lease); - return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint()); + return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint(), lease.pendingCheckpointState()); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { String message = "Unable to fetch checkpoint for shardId " + leaseKey; log.error(message, e); @@ -99,9 +99,14 @@ public class DynamoDBCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint, final String concurrencyToken) throws KinesisClientLibException { + prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); + } + + @Override + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { try { boolean wasSuccessful = - prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken)); + prepareCheckpoint(leaseKey, pendingCheckpoint, pendingCheckpointState, UUID.fromString(concurrencyToken)); if (!wasSuccessful) { throw new ShutdownException( "Can't prepare checkpoint - instance doesn't hold the lease for this shard"); @@ -134,7 +139,7 @@ public class DynamoDBCheckpointer implements Checkpointer { return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } - boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken) + boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, UUID concurrencyToken) throws DependencyException, InvalidStateException, ProvisionedThroughputException { Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); if (lease == null) { @@ -144,6 +149,7 @@ public class DynamoDBCheckpointer implements Checkpointer { } lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null")); + lease.pendingCheckpointState(pendingCheckpointState); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 682a6f9e..efac63b0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -82,6 +82,12 @@ public class Lease { * @return pending checkpoint, possibly null. */ private ExtendedSequenceNumber pendingCheckpoint; + + /** + * @return pending checkpoint state, possibly null. + */ + private byte[] pendingCheckpointState; + /** * @return count of distinct lease holders between checkpoints. */ @@ -212,6 +218,15 @@ public class Lease { this.pendingCheckpoint = pendingCheckpoint; } + /** + * Sets pending checkpoint state. + * + * @param pendingCheckpointState can be null + */ + public void pendingCheckpointState(byte[] pendingCheckpointState) { + this.pendingCheckpointState = pendingCheckpointState; + } + /** * Sets ownerSwitchesSinceCheckpoint. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java index 70cdd608..ad44df3e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java @@ -40,7 +40,7 @@ public interface Checkpointer { /** * Get the current checkpoint stored for the specified shard. Useful for checking that the parent shard * has been completely processed before we start processing the child shard. - * + * * @param leaseKey Current checkpoint for this shard is fetched * @return Current checkpoint for this shard, null if there is no record for this shard. * @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint @@ -73,6 +73,9 @@ public interface Checkpointer { void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException; + void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) + throws KinesisClientLibException; + void operation(String operation); String operation(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java index 2eb3f5c1..d42cbb9e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java @@ -93,7 +93,6 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; - /** * This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for * aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()} @@ -145,6 +144,9 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + PreparedCheckpointer prepareCheckpoint(byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** * This method will record a pending checkpoint at the at the provided record. This method is analogous to * {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint. @@ -174,6 +176,10 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint(Record record) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + + PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint. @@ -200,6 +206,10 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException; + /** * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()} @@ -228,5 +238,9 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException; + Checkpointer checkpointer(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index ebe933b9..83327931 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -32,6 +32,7 @@ public class InMemoryCheckpointer implements Checkpointer { private Map checkpoints = new HashMap<>(); private Map flushpoints = new HashMap<>(); private Map pendingCheckpoints = new HashMap<>(); + private Map pendingCheckpointStates = new HashMap<>(); private String operation; @@ -64,6 +65,11 @@ public class InMemoryCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException { + prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); + } + + @Override + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { pendingCheckpoints.put(leaseKey, pendingCheckpoint); } @@ -71,8 +77,9 @@ public class InMemoryCheckpointer implements Checkpointer { public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException { ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey); + byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey); - Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint); + Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint, pendingCheckpointState); log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj); return checkpointObj; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index fd6b531b..1de7b101 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -242,7 +242,7 @@ public class SchedulerTest { final List secondShardInfo = Collections.singletonList( new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber)); - final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null); + final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null); when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint); @@ -368,7 +368,7 @@ public class SchedulerTest { .map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber, sc.streamIdentifier().serialize())).collect(Collectors.toList()); - final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null); + final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null); when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint);