diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index 0f823915..ec9ff11d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -329,8 +329,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } try { - checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, applicationState, - shardInfo.concurrencyToken()); + checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken(), applicationState + ); } catch (ThrottlingException | ShutdownException | InvalidStateException | KinesisClientLibDependencyException e) { throw e; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index 71969f65..b93c3779 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -99,14 +99,14 @@ public class DynamoDBCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint, final String concurrencyToken) throws KinesisClientLibException { - prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); + prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null); } @Override - public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { try { boolean wasSuccessful = - prepareCheckpoint(leaseKey, pendingCheckpoint, pendingCheckpointState, UUID.fromString(concurrencyToken)); + prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState); if (!wasSuccessful) { throw new ShutdownException( "Can't prepare checkpoint - instance doesn't hold the lease for this shard"); @@ -139,7 +139,7 @@ public class DynamoDBCheckpointer implements Checkpointer { return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } - boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, UUID concurrencyToken) + boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken, byte[] pendingCheckpointState) throws DependencyException, InvalidStateException, ProvisionedThroughputException { Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); if (lease == null) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java index ad44df3e..7f3c947e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java @@ -73,7 +73,7 @@ public interface Checkpointer { void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException; - void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) + void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException; void operation(String operation); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index a6190dfc..661cf2ff 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -18,7 +18,6 @@ import java.util.HashMap; import java.util.Map; import software.amazon.kinesis.exceptions.KinesisClientLibException; -import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -65,11 +64,11 @@ public class InMemoryCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException { - prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); + prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null); } @Override - public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { pendingCheckpoints.put(leaseKey, pendingCheckpoint); pendingCheckpointStates.put(leaseKey, pendingCheckpointState); }