From af0cd5463df82e85a6de9031ddfd4be82a27717b Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Mon, 30 Mar 2020 02:35:13 -0400 Subject: [PATCH 01/13] Adding arbitrary application state checkpointing for two phase commit. --- .../StreamingShardRecordProcessorTest.java | 20 ++++++++++ .../amazon/kinesis/checkpoint/Checkpoint.java | 7 +++- .../ShardRecordProcessorCheckpointer.java | 40 ++++++++++++++----- .../dynamodb/DynamoDBCheckpointer.java | 12 ++++-- .../software/amazon/kinesis/leases/Lease.java | 15 +++++++ .../kinesis/processor/Checkpointer.java | 5 ++- .../RecordProcessorCheckpointer.java | 16 +++++++- .../checkpoint/InMemoryCheckpointer.java | 9 ++++- .../kinesis/coordinator/SchedulerTest.java | 4 +- 9 files changed, 110 insertions(+), 18 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java index da7e9fb2..e3368e07 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java @@ -112,6 +112,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + throw new UnsupportedOperationException(); + } + @Override public PreparedCheckpointer prepareCheckpoint(Record record) throws KinesisClientLibDependencyException, @@ -119,6 +124,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + throw new UnsupportedOperationException(); + } + @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber) throws KinesisClientLibDependencyException, @@ -126,6 +136,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + return null; + } + @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) throws KinesisClientLibDependencyException, @@ -133,6 +148,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + throw new UnsupportedOperationException(); + } + @Override public Checkpointer checkpointer() { throw new UnsupportedOperationException(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java index 2bab0cd6..7b7bea9e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java @@ -18,6 +18,8 @@ import lombok.Data; import lombok.experimental.Accessors; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.Arrays; + /** * A class encapsulating the 2 pieces of state stored in a checkpoint. */ @@ -26,18 +28,21 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class Checkpoint { private final ExtendedSequenceNumber checkpoint; private final ExtendedSequenceNumber pendingCheckpoint; + private final byte[] pendingCheckpointState; /** * Constructor. * * @param checkpoint the checkpoint sequence number - cannot be null or empty. * @param pendingCheckpoint the pending checkpoint sequence number - can be null. + * @param pendingCheckpointState the pending checkpoint state - can be null. */ - public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) { + public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState) { if (checkpoint == null || checkpoint.sequenceNumber().isEmpty()) { throw new IllegalArgumentException("Checkpoint cannot be null or empty"); } this.checkpoint = checkpoint; this.pendingCheckpoint = pendingCheckpoint; + this.pendingCheckpointState = pendingCheckpointState; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index 7d504bbb..0f823915 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -140,12 +140,13 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi this.largestPermittedCheckpointValue.subSequenceNumber()); } - /** - * {@inheritDoc} - */ @Override - public synchronized PreparedCheckpointer prepareCheckpoint(Record record) - throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState); + } + + @Override + public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { // // TODO: UserRecord Deprecation // @@ -154,10 +155,19 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } /*else if (record instanceof UserRecord) { return prepareCheckpoint(record.sequenceNumber(), ((UserRecord) record).subSequenceNumber()); } */ else { - return prepareCheckpoint(record.sequenceNumber(), 0); + return prepareCheckpoint(record.sequenceNumber(), 0, applicationState); } } + /** + * {@inheritDoc} + */ + @Override + public synchronized PreparedCheckpointer prepareCheckpoint(Record record) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + return prepareCheckpoint(record, null); + } + /** * {@inheritDoc} */ @@ -167,13 +177,24 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi return prepareCheckpoint(sequenceNumber, 0); } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + return prepareCheckpoint(sequenceNumber, 0, null); + } + /** * {@inheritDoc} */ @Override public synchronized PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + return prepareCheckpoint(sequenceNumber, subSequenceNumber, null); + } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { if (subSequenceNumber < 0) { throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number " + subSequenceNumber); @@ -191,7 +212,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}", ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint); } - return doPrepareCheckpoint(pendingCheckpoint); + return doPrepareCheckpoint(pendingCheckpoint, applicationState); } else { throw new IllegalArgumentException(String.format( "Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable " @@ -290,7 +311,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi * @throws ThrottlingException * @throws ShutdownException */ - private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber) + private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber; @@ -308,7 +329,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } try { - checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken()); + checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, applicationState, + shardInfo.concurrencyToken()); } catch (ThrottlingException | ShutdownException | InvalidStateException | KinesisClientLibDependencyException e) { throw e; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index fb7a6fc7..71969f65 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -88,7 +88,7 @@ public class DynamoDBCheckpointer implements Checkpointer { try { Lease lease = leaseRefresher.getLease(leaseKey); log.debug("[{}] Retrieved lease => {}", leaseKey, lease); - return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint()); + return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint(), lease.pendingCheckpointState()); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { String message = "Unable to fetch checkpoint for shardId " + leaseKey; log.error(message, e); @@ -99,9 +99,14 @@ public class DynamoDBCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint, final String concurrencyToken) throws KinesisClientLibException { + prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); + } + + @Override + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { try { boolean wasSuccessful = - prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken)); + prepareCheckpoint(leaseKey, pendingCheckpoint, pendingCheckpointState, UUID.fromString(concurrencyToken)); if (!wasSuccessful) { throw new ShutdownException( "Can't prepare checkpoint - instance doesn't hold the lease for this shard"); @@ -134,7 +139,7 @@ public class DynamoDBCheckpointer implements Checkpointer { return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } - boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken) + boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, UUID concurrencyToken) throws DependencyException, InvalidStateException, ProvisionedThroughputException { Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); if (lease == null) { @@ -144,6 +149,7 @@ public class DynamoDBCheckpointer implements Checkpointer { } lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null")); + lease.pendingCheckpointState(pendingCheckpointState); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 682a6f9e..efac63b0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -82,6 +82,12 @@ public class Lease { * @return pending checkpoint, possibly null. */ private ExtendedSequenceNumber pendingCheckpoint; + + /** + * @return pending checkpoint state, possibly null. + */ + private byte[] pendingCheckpointState; + /** * @return count of distinct lease holders between checkpoints. */ @@ -212,6 +218,15 @@ public class Lease { this.pendingCheckpoint = pendingCheckpoint; } + /** + * Sets pending checkpoint state. + * + * @param pendingCheckpointState can be null + */ + public void pendingCheckpointState(byte[] pendingCheckpointState) { + this.pendingCheckpointState = pendingCheckpointState; + } + /** * Sets ownerSwitchesSinceCheckpoint. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java index 70cdd608..ad44df3e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java @@ -40,7 +40,7 @@ public interface Checkpointer { /** * Get the current checkpoint stored for the specified shard. Useful for checking that the parent shard * has been completely processed before we start processing the child shard. - * + * * @param leaseKey Current checkpoint for this shard is fetched * @return Current checkpoint for this shard, null if there is no record for this shard. * @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint @@ -73,6 +73,9 @@ public interface Checkpointer { void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException; + void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) + throws KinesisClientLibException; + void operation(String operation); String operation(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java index 2eb3f5c1..d42cbb9e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java @@ -93,7 +93,6 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; - /** * This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for * aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()} @@ -145,6 +144,9 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + PreparedCheckpointer prepareCheckpoint(byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** * This method will record a pending checkpoint at the at the provided record. This method is analogous to * {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint. @@ -174,6 +176,10 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint(Record record) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + + PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint. @@ -200,6 +206,10 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException; + /** * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()} @@ -228,5 +238,9 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException; + Checkpointer checkpointer(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index ebe933b9..83327931 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -32,6 +32,7 @@ public class InMemoryCheckpointer implements Checkpointer { private Map checkpoints = new HashMap<>(); private Map flushpoints = new HashMap<>(); private Map pendingCheckpoints = new HashMap<>(); + private Map pendingCheckpointStates = new HashMap<>(); private String operation; @@ -64,6 +65,11 @@ public class InMemoryCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException { + prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); + } + + @Override + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { pendingCheckpoints.put(leaseKey, pendingCheckpoint); } @@ -71,8 +77,9 @@ public class InMemoryCheckpointer implements Checkpointer { public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException { ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey); + byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey); - Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint); + Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint, pendingCheckpointState); log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj); return checkpointObj; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index fd6b531b..1de7b101 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -242,7 +242,7 @@ public class SchedulerTest { final List secondShardInfo = Collections.singletonList( new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber)); - final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null); + final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null); when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint); @@ -368,7 +368,7 @@ public class SchedulerTest { .map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber, sc.streamIdentifier().serialize())).collect(Collectors.toList()); - final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null); + final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null); when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint); From 26c737cc2a3a36e27913274e3139dc8243cd747b Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 03:22:56 -0400 Subject: [PATCH 02/13] Adding in memory implememtation for pending checkpoint state --- .../software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index 83327931..a6190dfc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -71,6 +71,7 @@ public class InMemoryCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { pendingCheckpoints.put(leaseKey, pendingCheckpoint); + pendingCheckpointStates.put(leaseKey, pendingCheckpointState); } @Override From 5355b4b7c5d7910c6c9de0bff5b5111cad1f44a9 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 03:28:58 -0400 Subject: [PATCH 03/13] Moving new parameters to end --- .../checkpoint/ShardRecordProcessorCheckpointer.java | 4 ++-- .../kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java | 8 ++++---- .../software/amazon/kinesis/processor/Checkpointer.java | 2 +- .../amazon/kinesis/checkpoint/InMemoryCheckpointer.java | 5 ++--- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index 0f823915..ec9ff11d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -329,8 +329,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } try { - checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, applicationState, - shardInfo.concurrencyToken()); + checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken(), applicationState + ); } catch (ThrottlingException | ShutdownException | InvalidStateException | KinesisClientLibDependencyException e) { throw e; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index 71969f65..b93c3779 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -99,14 +99,14 @@ public class DynamoDBCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint, final String concurrencyToken) throws KinesisClientLibException { - prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); + prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null); } @Override - public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { try { boolean wasSuccessful = - prepareCheckpoint(leaseKey, pendingCheckpoint, pendingCheckpointState, UUID.fromString(concurrencyToken)); + prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState); if (!wasSuccessful) { throw new ShutdownException( "Can't prepare checkpoint - instance doesn't hold the lease for this shard"); @@ -139,7 +139,7 @@ public class DynamoDBCheckpointer implements Checkpointer { return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } - boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, UUID concurrencyToken) + boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken, byte[] pendingCheckpointState) throws DependencyException, InvalidStateException, ProvisionedThroughputException { Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); if (lease == null) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java index ad44df3e..7f3c947e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java @@ -73,7 +73,7 @@ public interface Checkpointer { void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException; - void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) + void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException; void operation(String operation); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index a6190dfc..661cf2ff 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -18,7 +18,6 @@ import java.util.HashMap; import java.util.Map; import software.amazon.kinesis.exceptions.KinesisClientLibException; -import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -65,11 +64,11 @@ public class InMemoryCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException { - prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); + prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null); } @Override - public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { pendingCheckpoints.put(leaseKey, pendingCheckpoint); pendingCheckpointStates.put(leaseKey, pendingCheckpointState); } From 1794874e3365d38ea2cc2c83bed1c3ff844dc9e9 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 03:45:03 -0400 Subject: [PATCH 04/13] Clean up checkpoint state after successful checkpoint --- .../amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java | 1 + .../software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java | 1 + 2 files changed, 2 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index b93c3779..d9646351 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -134,6 +134,7 @@ public class DynamoDBCheckpointer implements Checkpointer { lease.checkpoint(checkpoint); lease.pendingCheckpoint(null); + lease.pendingCheckpointState(null); lease.ownerSwitchesSinceCheckpoint(0L); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index 661cf2ff..8f6e165d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -44,6 +44,7 @@ public class InMemoryCheckpointer implements Checkpointer { checkpoints.put(leaseKey, checkpointValue); flushpoints.put(leaseKey, checkpointValue); pendingCheckpoints.remove(leaseKey); + pendingCheckpointStates.remove(leaseKey); if (log.isDebugEnabled()) { log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue); From ae005ce0f898992d36855b5d3fe5737a5449d2e3 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 04:03:38 -0400 Subject: [PATCH 05/13] Adding unit tests --- .../kinesis/checkpoint/CheckpointerTest.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java index 1cf77a3d..b823c8e3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java @@ -90,6 +90,26 @@ public class CheckpointerTest { Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); } + + @Test + public final void testInitialPrepareCheckpointWithApplicationState() throws Exception { + String sequenceNumber = "1"; + String pendingCheckpointValue = "99999"; + String shardId = "myShardId"; + byte[] applicationState = "applicationState".getBytes(); + ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(sequenceNumber); + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken); + + ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue); + checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), testConcurrencyToken, + applicationState); + + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + } + @Test public final void testAdvancingPrepareCheckpoint() throws Exception { String shardId = "myShardId"; @@ -107,6 +127,26 @@ public class CheckpointerTest { } } + @Test + public final void testAdvancingPrepareCheckpointWithApplicationState() throws Exception { + String shardId = "myShardId"; + String checkpointValue = "12345"; + byte[] applicationState = "applicationState".getBytes(); + ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue); + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), testConcurrencyToken); + + for (Integer i = 0; i < 10; i++) { + String sequenceNumber = i.toString(); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber); + checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken, + applicationState); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + } + } + @Test public final void testPrepareAndSetCheckpoint() throws Exception { String checkpointValue = "12345"; @@ -134,4 +174,35 @@ public class CheckpointerTest { Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); } + + @Test + public final void testPrepareAndSetCheckpointWithApplicationState() throws Exception { + String checkpointValue = "12345"; + String shardId = "testShardId-1"; + String concurrencyToken = "token-1"; + String pendingCheckpointValue = "99999"; + byte[] applicationState = "applicationState".getBytes(); + + // set initial checkpoint + ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue); + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), concurrencyToken); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + + // prepare checkpoint + ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue); + checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken, applicationState); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + + // do checkpoint + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + } } From b335246a306db03b55d9287dbb2522d3e0c11169 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 06:15:18 -0400 Subject: [PATCH 06/13] Adding serializer for application state to lease info --- .../amazon/kinesis/leases/DynamoUtils.java | 18 ++++++++++++++++++ .../dynamodb/DynamoDBLeaseSerializer.java | 15 +++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java index 9d5f9ae2..29d6029b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.leases; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -36,6 +37,14 @@ public class DynamoUtils { return AttributeValue.builder().ss(collectionValue).build(); } + public static AttributeValue createAttributeValue(byte[] byteBufferValue) { + if (byteBufferValue == null) { + throw new IllegalArgumentException("Byte buffer attributeValues cannot be null or empty."); + } + + return AttributeValue.builder().b(SdkBytes.fromByteArray(byteBufferValue)).build(); + } + public static AttributeValue createAttributeValue(String stringValue) { if (stringValue == null || stringValue.isEmpty()) { throw new IllegalArgumentException("String attributeValues cannot be null or empty."); @@ -52,6 +61,15 @@ public class DynamoUtils { return AttributeValue.builder().n(longValue.toString()).build(); } + public static byte[] safeGetByteArray(Map dynamoRecord, String key) { + AttributeValue av = dynamoRecord.get(key); + if (av == null) { + return null; + } else { + return av.b().asByteArray(); + } + } + public static Long safeGetLong(Map dynamoRecord, String key) { AttributeValue av = dynamoRecord.get(key); if (av == null) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index a02e2a6e..f42bafcf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -50,6 +50,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber"; private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint"; private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber"; + private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; @Override @@ -75,6 +76,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber())); } + if (lease.pendingCheckpointState() != null) { + result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber())); + } + return result; } @@ -105,6 +110,9 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY)) ); } + + leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); + return leaseToUpdate; } @@ -220,6 +228,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); } + + if (lease.pendingCheckpointState() != null) { + result.put(PENDING_CHECKPOINT_STATE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpointState()))); + } else { + result.put(PENDING_CHECKPOINT_STATE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); + } + return result; } From ddaf714a09c7cafc31c3dddea003f973054f3efc Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 06:15:47 -0400 Subject: [PATCH 07/13] Adding application state to lease copy --- .../src/main/java/software/amazon/kinesis/leases/Lease.java | 6 ++++-- .../amazon/kinesis/leases/HierarchicalShardSyncerTest.java | 2 +- .../java/software/amazon/kinesis/leases/LeaseBuilder.java | 3 ++- .../kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index efac63b0..a7ed666c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -102,13 +102,13 @@ public class Lease { protected Lease(Lease lease) { this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), - lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds()); + lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.pendingCheckpointState()); } public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, final UUID concurrencyToken, final Long lastCounterIncrementNanos, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, - final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { + final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final byte[] pendingCheckpointState) { this.leaseKey = leaseKey; this.leaseOwner = leaseOwner; this.leaseCounter = leaseCounter; @@ -120,6 +120,7 @@ public class Lease { if (parentShardIds != null) { this.parentShardIds.addAll(parentShardIds); } + this.pendingCheckpointState = pendingCheckpointState; } /** @@ -139,6 +140,7 @@ public class Lease { ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint()); checkpoint(lease.checkpoint); pendingCheckpoint(lease.pendingCheckpoint); + pendingCheckpointState(lease.pendingCheckpointState); parentShardIds(lease.parentShardIds); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index e1dfc52a..374da4cd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -969,7 +969,7 @@ public class HierarchicalShardSyncerTest { parentShardIds.add(shard.adjacentParentShardId()); } return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L, - parentShardIds); + parentShardIds, null); }).collect(Collectors.toList()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java index ee38116f..591e7db0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java @@ -34,9 +34,10 @@ public class LeaseBuilder { private ExtendedSequenceNumber pendingCheckpoint; private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); + private byte[] pendingCheckpointState; public Lease build() { return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, - checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds); + checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, pendingCheckpointState); } } \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 61cba722..f22e6e4d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -55,7 +55,7 @@ public class DynamoDBLeaseRenewerTest { private LeaseRefresher leaseRefresher; private static Lease newLease(String leaseKey) { - return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>()); + return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), null); } @Before From a61890ab40358d78d4994c0d14c9a0c509af2a67 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 06:16:04 -0400 Subject: [PATCH 08/13] Adding integration test for serializing application info --- ...namoDBLeaseCoordinatorIntegrationTest.java | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index 3af33c69..d89c010e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -127,16 +127,37 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { } assertNotNull(lease); - ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber("newCheckpoint"); + final ExtendedSequenceNumber initialCheckpoint = new ExtendedSequenceNumber("initialCheckpoint"); + final ExtendedSequenceNumber pendingCheckpoint = new ExtendedSequenceNumber("pendingCheckpoint"); + final ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber("newCheckpoint"); + final byte[] checkpointState = "checkpointState".getBytes(); + // lease's leaseCounter is wrong at this point, but it shouldn't matter. + assertTrue(dynamoDBCheckpointer.setCheckpoint(lease.leaseKey(), initialCheckpoint, lease.concurrencyToken())); + + final Lease leaseFromDDBAtInitialCheckpoint = leaseRefresher.getLease(lease.leaseKey()); + lease.leaseCounter(lease.leaseCounter() + 1); + lease.checkpoint(initialCheckpoint); + lease.leaseOwner(coordinator.workerIdentifier()); + assertEquals(lease, leaseFromDDBAtInitialCheckpoint); + + dynamoDBCheckpointer.prepareCheckpoint(lease.leaseKey(), pendingCheckpoint, lease.concurrencyToken().toString(), checkpointState); + + final Lease leaseFromDDBAtPendingCheckpoint = leaseRefresher.getLease(lease.leaseKey()); + lease.leaseCounter(lease.leaseCounter() + 1); + lease.checkpoint(initialCheckpoint); + lease.pendingCheckpoint(pendingCheckpoint); + lease.pendingCheckpointState(checkpointState); + assertEquals(lease, leaseFromDDBAtPendingCheckpoint); + assertTrue(dynamoDBCheckpointer.setCheckpoint(lease.leaseKey(), newCheckpoint, lease.concurrencyToken())); - Lease fromDynamo = leaseRefresher.getLease(lease.leaseKey()); - + final Lease leaseFromDDBAtNewCheckpoint = leaseRefresher.getLease(lease.leaseKey()); lease.leaseCounter(lease.leaseCounter() + 1); lease.checkpoint(newCheckpoint); - lease.leaseOwner(coordinator.workerIdentifier()); - assertEquals(lease, fromDynamo); + lease.pendingCheckpoint(null); + lease.pendingCheckpointState(null); + assertEquals(lease, leaseFromDDBAtNewCheckpoint); } /** From 80df8ce4390853a2ea24f43928e854ecb3403995 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 06:22:22 -0400 Subject: [PATCH 09/13] Cleaning up unused imports --- .../java/software/amazon/kinesis/checkpoint/Checkpoint.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java index 7b7bea9e..91f01d0b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java @@ -18,8 +18,6 @@ import lombok.Data; import lombok.experimental.Accessors; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.util.Arrays; - /** * A class encapsulating the 2 pieces of state stored in a checkpoint. */ From 64469f419988740c6a30a54a1d5e56fddf25094a Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Fri, 10 Apr 2020 05:55:45 -0400 Subject: [PATCH 10/13] PR feedback --- .../amazon/kinesis/checkpoint/Checkpoint.java | 5 + .../ShardRecordProcessorCheckpointer.java | 9 ++ .../software/amazon/kinesis/leases/Lease.java | 13 ++- .../kinesis/lifecycle/InitializeTask.java | 1 + .../lifecycle/events/InitializationInput.java | 8 ++ .../kinesis/processor/Checkpointer.java | 13 +++ .../RecordProcessorCheckpointer.java | 105 +++++++++++++++++- 7 files changed, 152 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java index 91f01d0b..f5af81e3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java @@ -28,6 +28,11 @@ public class Checkpoint { private final ExtendedSequenceNumber pendingCheckpoint; private final byte[] pendingCheckpointState; + @Deprecated + public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) { + this(checkpoint, pendingCheckpoint, null); + } + /** * Constructor. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index ec9ff11d..4e831c4b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -140,6 +140,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi this.largestPermittedCheckpointValue.subSequenceNumber()); } + /** + * {@inheritDoc} + */ @Override public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState); @@ -177,6 +180,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi return prepareCheckpoint(sequenceNumber, 0); } + /** + * {@inheritDoc} + */ @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { @@ -192,6 +198,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi return prepareCheckpoint(sequenceNumber, subSequenceNumber, null); } + /** + * {@inheritDoc} + */ @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index a7ed666c..15f79739 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; @NoArgsConstructor @Getter @Accessors(fluent = true) -@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos"}) +@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "pendingCheckpointState"}) @ToString public class Lease { /* @@ -84,6 +84,8 @@ public class Lease { private ExtendedSequenceNumber pendingCheckpoint; /** + * Last pending application state. Deliberately excluded from hashCode and equals. + * * @return pending checkpoint state, possibly null. */ private byte[] pendingCheckpointState; @@ -105,6 +107,15 @@ public class Lease { lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.pendingCheckpointState()); } + @Deprecated + public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, + final UUID concurrencyToken, final Long lastCounterIncrementNanos, + final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, + final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { + this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, + ownerSwitchesSinceCheckpoint, parentShardIds, null); + } + public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, final UUID concurrencyToken, final Long lastCounterIncrementNanos, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index e11eebfa..4108dd9b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -92,6 +92,7 @@ public class InitializeTask implements ConsumerTask { .shardId(shardInfo.shardId()) .extendedSequenceNumber(initialCheckpoint) .pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint()) + .pendingCheckpointState(initialCheckpointObject.pendingCheckpointState()) .build(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java index d6c586aa..3717a805 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java @@ -47,4 +47,12 @@ public class InitializationInput { * completing the checkpoint. */ private final ExtendedSequenceNumber pendingCheckpointSequenceNumber; + + /** + * The last pending application state of the previous record processor. May be null. + * + * This will only be set if the previous record processor had prepared a checkpoint, but lost its lease before + * completing the checkpoint. + */ + private final byte[] pendingCheckpointState; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java index 7f3c947e..2ffadc06 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java @@ -73,6 +73,19 @@ public interface Checkpointer { void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException; + /** + * Record intent to checkpoint for a shard. Upon failover, the pendingCheckpoint and pendingCheckpointState will be + * passed to the new ShardRecordProcessor's initialize() method. + * + * @param leaseKey Checkpoint is specified for this shard. + * @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number) + * @param concurrencyToken Used with conditional writes to prevent stale updates + * (e.g. if there was a fail over to a different record processor, we don't want to + * overwrite it's checkpoint) + * @param pendingCheckpointState Serialized application state at the pending checkpoint. + * + * @throws KinesisClientLibException Thrown if we were unable to save the checkpoint + */ void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java index d42cbb9e..34b2930c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java @@ -144,6 +144,29 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** + * This method will record a pending checkpoint at the last data record that was delivered to the record processor. + * If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next + * IRecordProcessor for this shard will be informed of the prepared sequence number and application state. + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number and application state passed in to init() to behave idempotently. + * + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + */ PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; @@ -176,7 +199,35 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint(Record record) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; - + /** + * This method will record a pending checkpoint at the at the provided record. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the record and application state at which to + * prepare the checkpoint. + * + * @param record A record at which to prepare checkpoint in this shard. + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number and application state passed in to init() to behave idempotently. + * + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; @@ -206,6 +257,31 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + /** + * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number and application state + * at which to checkpoint. + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; @@ -238,6 +314,33 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + /** + * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for + * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()} + * but provides the ability to specify the sequence number, subsequence number, and application state at which to + * checkpoint. + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + * @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard. + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; From d31758727c3d3a1c56271b1790ca62ca6093f62c Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 14 Apr 2020 04:37:11 -0400 Subject: [PATCH 11/13] Fixing bug --- .../kinesis/checkpoint/ShardRecordProcessorCheckpointer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index 4e831c4b..8ecf58ec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -186,7 +186,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { - return prepareCheckpoint(sequenceNumber, 0, null); + return prepareCheckpoint(sequenceNumber, 0, applicationState); } /** From 4782e43c3a24c45d78b1a69b165c095f8b90dcc9 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Wed, 15 Apr 2020 19:26:53 -0400 Subject: [PATCH 12/13] PR comments --- .../kinesis/checkpoint/ShardRecordProcessorCheckpointer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index 8ecf58ec..31506304 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -338,8 +338,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } try { - checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken(), applicationState - ); + checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken(), applicationState); } catch (ThrottlingException | ShutdownException | InvalidStateException | KinesisClientLibDependencyException e) { throw e; From 22fa12abfaa4b38ec5662083fb298e0a2d36f06f Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Wed, 15 Apr 2020 19:29:17 -0400 Subject: [PATCH 13/13] Java docs --- .../kinesis/checkpoint/ShardRecordProcessorCheckpointer.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index 31506304..fd375264 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -148,6 +148,9 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState); } + /** + * {@inheritDoc} + */ @Override public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { //