Moving new parameters to end

This commit is contained in:
Joshua Kim 2020-04-07 03:28:58 -04:00
parent 26c737cc2a
commit 5355b4b7c5
4 changed files with 9 additions and 10 deletions

View file

@ -329,8 +329,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
} }
try { try {
checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, applicationState, checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken(), applicationState
shardInfo.concurrencyToken()); );
} catch (ThrottlingException | ShutdownException | InvalidStateException } catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) { | KinesisClientLibDependencyException e) {
throw e; throw e;

View file

@ -99,14 +99,14 @@ public class DynamoDBCheckpointer implements Checkpointer {
@Override @Override
public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint, public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint,
final String concurrencyToken) throws KinesisClientLibException { final String concurrencyToken) throws KinesisClientLibException {
prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null);
} }
@Override @Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException {
try { try {
boolean wasSuccessful = boolean wasSuccessful =
prepareCheckpoint(leaseKey, pendingCheckpoint, pendingCheckpointState, UUID.fromString(concurrencyToken)); prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState);
if (!wasSuccessful) { if (!wasSuccessful) {
throw new ShutdownException( throw new ShutdownException(
"Can't prepare checkpoint - instance doesn't hold the lease for this shard"); "Can't prepare checkpoint - instance doesn't hold the lease for this shard");
@ -139,7 +139,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
} }
boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, UUID concurrencyToken) boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken, byte[] pendingCheckpointState)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey);
if (lease == null) { if (lease == null) {

View file

@ -73,7 +73,7 @@ public interface Checkpointer {
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException; throws KinesisClientLibException;
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState)
throws KinesisClientLibException; throws KinesisClientLibException;
void operation(String operation); void operation(String operation);

View file

@ -18,7 +18,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -65,11 +64,11 @@ public class InMemoryCheckpointer implements Checkpointer {
@Override @Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException { throws KinesisClientLibException {
prepareCheckpoint(leaseKey, pendingCheckpoint, null, concurrencyToken); prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null);
} }
@Override @Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState, String concurrencyToken) throws KinesisClientLibException { public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException {
pendingCheckpoints.put(leaseKey, pendingCheckpoint); pendingCheckpoints.put(leaseKey, pendingCheckpoint);
pendingCheckpointStates.put(leaseKey, pendingCheckpointState); pendingCheckpointStates.put(leaseKey, pendingCheckpointState);
} }