diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/ICheckpoint.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/ICheckpoint.java index d559bfc0..83c29b44 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/ICheckpoint.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/ICheckpoint.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.interfaces; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; +import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; /** @@ -46,4 +47,30 @@ public interface ICheckpoint { */ ExtendedSequenceNumber getCheckpoint(String shardId) throws KinesisClientLibException; + /** + * Get the current checkpoint stored for the specified shard, which holds the sequence numbers for the checkpoint + * and pending checkpoint. Useful for checking that the parent shard has been completely processed before we start + * processing the child shard. + * + * @param shardId Current checkpoint for this shard is fetched + * @return Current checkpoint object for this shard, null if there is no record for this shard. + * @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint + */ + Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException; + + + /** + * Record intent to checkpoint for a shard. Upon failover, the pendingCheckpointValue will be passed to the new + * RecordProcessor's initialize() method. + * + * @param shardId Checkpoint is specified for this shard. + * @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number) + * @param concurrencyToken Used with conditional writes to prevent stale updates + * (e.g. if there was a fail over to a different record processor, we don't want to + * overwrite it's checkpoint) + * @throws KinesisClientLibException Thrown if we were unable to save the checkpoint + */ + void prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) + throws KinesisClientLibException; + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IPreparedCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IPreparedCheckpointer.java new file mode 100644 index 00000000..04827a63 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IPreparedCheckpointer.java @@ -0,0 +1,41 @@ +package com.amazonaws.services.kinesis.clientlibrary.interfaces; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; + +/** + * Objects of this class are prepared to checkpoint at a specific sequence number. They use an + * IRecordProcessorCheckpointer to do the actual checkpointing, so their checkpoint is subject to the same 'didn't go + * backwards' validation as a normal checkpoint. + */ +public interface IPreparedCheckpointer { + + /** + * @return sequence number of pending checkpoint + */ + ExtendedSequenceNumber getPendingCheckpoint(); + + /** + * This method will record a pending checkpoint. + * + * @throws ThrottlingException Can't store checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @throws InvalidStateException Can't store checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the checkpoint. The application can + * backoff and retry. + * @throws IllegalArgumentException The sequence number being checkpointed is invalid because it is out of range, + * i.e. it is smaller than the last check point value (prepared or committed), or larger than the greatest + * sequence number seen by the associated record processor. + */ + void checkpoint() + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException; + +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java index f64d3c43..df4acc36 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java @@ -120,4 +120,111 @@ public interface IRecordProcessorCheckpointer { void checkpoint(String sequenceNumber, long subSequenceNumber) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + + /** + * This method will record a pending checkpoint at the last data record that was delivered to the record processor. + * If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next + * IRecordProcessor for this shard will be informed of the prepared sequence number + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number passed in to init() to behave idempotently. + * + * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + */ + IPreparedCheckpointer prepareCheckpoint() + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + + /** + * This method will record a pending checkpoint at the at the provided record. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint. + * + * @param record A record at which to prepare checkpoint in this shard. + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number and application state passed in to init() to behave idempotently. + * + * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + IPreparedCheckpointer prepareCheckpoint(Record record) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + + /** + * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint. + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + + * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + IPreparedCheckpointer prepareCheckpoint(String sequenceNumber) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException; + + /** + * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for + * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()} + * but provides the ability to specify the sequence number at which to checkpoint + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + * @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard. + * + * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + IPreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/Checkpoint.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/Checkpoint.java new file mode 100644 index 00000000..d81c632f --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/Checkpoint.java @@ -0,0 +1,27 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import lombok.Data; + +/** + * A class encapsulating the 2 pieces of state stored in a checkpoint. + */ +@Data public class Checkpoint { + + private final ExtendedSequenceNumber checkpoint; + private final ExtendedSequenceNumber pendingCheckpoint; + + /** + * Constructor. + * + * @param checkpoint the checkpoint sequence number - cannot be null or empty. + * @param pendingCheckpoint the pending checkpoint sequence number - can be null. + */ + public Checkpoint(ExtendedSequenceNumber checkpoint, ExtendedSequenceNumber pendingCheckpoint) { + if (checkpoint == null || checkpoint.getSequenceNumber().isEmpty()) { + throw new IllegalArgumentException("Checkpoint cannot be null or empty"); + } + this.checkpoint = checkpoint; + this.pendingCheckpoint = pendingCheckpoint; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java new file mode 100644 index 00000000..ed72f317 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java @@ -0,0 +1,52 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; + +/** + * A special IPreparedCheckpointer that does nothing, which can be used when preparing a checkpoint at the current + * checkpoint sequence number where it is never necessary to do another checkpoint. + * This simplifies programming by preventing application developers from having to reason about whether + * their application has processed records before calling prepareCheckpoint + * + * Here's why it's safe to do nothing: + * The only way to checkpoint at current checkpoint value is to have a record processor that gets + * initialized, processes 0 records, then calls prepareCheckpoint(). The value in the table is the same, so there's + * no reason to overwrite it with another copy of itself. + */ +public class DoesNothingPreparedCheckpointer implements IPreparedCheckpointer { + + private final ExtendedSequenceNumber sequenceNumber; + + /** + * Constructor. + * @param sequenceNumber the sequence number value + */ + public DoesNothingPreparedCheckpointer(ExtendedSequenceNumber sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + /** + * {@inheritDoc} + */ + @Override + public ExtendedSequenceNumber getPendingCheckpoint() { + return sequenceNumber; + } + + /** + * {@inheritDoc} + */ + @Override + public void checkpoint() + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException { + // This method does nothing + } + +} + diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 262b98c7..e3d9f607 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -19,6 +19,7 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; @@ -75,7 +76,8 @@ class InitializeTask implements ITask { try { LOG.debug("Initializing ShardId " + shardInfo.getShardId()); - ExtendedSequenceNumber initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId()); + Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); + ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint); @@ -84,7 +86,8 @@ class InitializeTask implements ITask { LOG.debug("Calling the record processor initialize()."); final InitializationInput initializationInput = new InitializationInput() .withShardId(shardInfo.getShardId()) - .withExtendedSequenceNumber(initialCheckpoint); + .withExtendedSequenceNumber(initialCheckpoint) + .withPendingCheckpointSequenceNumber(initialCheckpointObject.getPendingCheckpoint()); final long recordProcessorStartTimeMillis = System.currentTimeMillis(); try { recordProcessor.initialize(initializationInput); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 42fa7d0c..448a2953 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -30,6 +31,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; @@ -157,6 +159,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator snB. + * + * @param extendedSequenceNumber the sequence number for the prepared checkpoint + * @return a prepared checkpointer that is ready to checkpoint at the given sequence number. + * @throws KinesisClientLibDependencyException + * @throws InvalidStateException + * @throws ThrottlingException + * @throws ShutdownException + */ + private IPreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + + ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber; + if (sequenceNumberAtShardEnd != null && sequenceNumberAtShardEnd.equals(extendedSequenceNumber)) { + // If we are about to checkpoint the very last sequence number for this shard, we might as well + // just checkpoint at SHARD_END + newPrepareCheckpoint = ExtendedSequenceNumber.SHARD_END; + } + + // Don't actually prepare a checkpoint if they're trying to checkpoint at the current checkpointed value. + // The only way this can happen is if they call prepareCheckpoint() in a record processor that was initialized + // AND that has not processed any records since initialization. + if (newPrepareCheckpoint.equals(lastCheckpointValue)) { + return new DoesNothingPreparedCheckpointer(newPrepareCheckpoint); + } + + try { + checkpoint.prepareCheckpoint(shardInfo.getShardId(), newPrepareCheckpoint, shardInfo.getConcurrencyToken()); + } catch (ThrottlingException | ShutdownException | InvalidStateException + | KinesisClientLibDependencyException e) { + throw e; + } catch (KinesisClientLibException e) { + LOG.warn("Caught exception setting prepareCheckpoint.", e); + throw new KinesisClientLibDependencyException("Caught exception while prepareCheckpointing", e); + } + + PreparedCheckpointer result = new PreparedCheckpointer(newPrepareCheckpoint, this); + return result; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/InitializationInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/InitializationInput.java index 8f044383..a44aa844 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/InitializationInput.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/InitializationInput.java @@ -23,6 +23,7 @@ public class InitializationInput { private String shardId; private ExtendedSequenceNumber extendedSequenceNumber; + private ExtendedSequenceNumber pendingCheckpointSequenceNumber; /** * Default constructor. @@ -71,4 +72,26 @@ public class InitializationInput { this.extendedSequenceNumber = extendedSequenceNumber; return this; } + + /** + * Get pending checkpoint {@link ExtendedSequenceNumber}. + * + * @return The {@link ExtendedSequenceNumber} in the shard for which a checkpoint is pending + */ + public ExtendedSequenceNumber getPendingCheckpointSequenceNumber() { + return pendingCheckpointSequenceNumber; + } + + /** + * Set pending checkpoint {@link ExtendedSequenceNumber}. + * + * @param pendingCheckpointSequenceNumber The {@link ExtendedSequenceNumber} in the shard for which a checkpoint + * is pending + * @return A reference to this updated object so that method calls can be chained together. + */ + public InitializationInput withPendingCheckpointSequenceNumber( + ExtendedSequenceNumber pendingCheckpointSequenceNumber) { + this.pendingCheckpointSequenceNumber = pendingCheckpointSequenceNumber; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index b3a0ce6c..f3b2e828 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -27,6 +27,7 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber public class KinesisClientLease extends Lease { private ExtendedSequenceNumber checkpoint; + private ExtendedSequenceNumber pendingCheckpoint; private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet(); @@ -37,16 +38,18 @@ public class KinesisClientLease extends Lease { public KinesisClientLease(KinesisClientLease other) { super(other); this.checkpoint = other.getCheckpoint(); + this.pendingCheckpoint = other.getPendingCheckpoint(); this.ownerSwitchesSinceCheckpoint = other.getOwnerSwitchesSinceCheckpoint(); this.parentShardIds.addAll(other.getParentShardIds()); } KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken, - Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, Long ownerSwitchesSinceCheckpoint, - Set parentShardIds) { + Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, ExtendedSequenceNumber pendingCheckpoint, + Long ownerSwitchesSinceCheckpoint, Set parentShardIds) { super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos); this.checkpoint = checkpoint; + this.pendingCheckpoint = pendingCheckpoint; this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint; this.parentShardIds.addAll(parentShardIds); } @@ -64,6 +67,7 @@ public class KinesisClientLease extends Lease { setOwnerSwitchesSinceCheckpoint(casted.ownerSwitchesSinceCheckpoint); setCheckpoint(casted.checkpoint); + setPendingCheckpoint(casted.pendingCheckpoint); setParentShardIds(casted.parentShardIds); } @@ -75,6 +79,13 @@ public class KinesisClientLease extends Lease { return checkpoint; } + /** + * @return pending checkpoint, possibly null. + */ + public ExtendedSequenceNumber getPendingCheckpoint() { + return pendingCheckpoint; + } + /** * @return count of distinct lease holders between checkpoints. */ @@ -100,6 +111,15 @@ public class KinesisClientLease extends Lease { this.checkpoint = checkpoint; } + /** + * Sets pending checkpoint. + * + * @param pendingCheckpoint can be null + */ + public void setPendingCheckpoint(ExtendedSequenceNumber pendingCheckpoint) { + this.pendingCheckpoint = pendingCheckpoint; + } + /** * Sets ownerSwitchesSinceCheckpoint. * @@ -134,6 +154,7 @@ public class KinesisClientLease extends Lease { final int prime = 31; int result = super.hashCode(); result = prime * result + ((checkpoint == null) ? 0 : checkpoint.hashCode()); + result = pendingCheckpoint == null ? result : prime * result + pendingCheckpoint.hashCode(); result = prime * result + ((ownerSwitchesSinceCheckpoint == null) ? 0 : ownerSwitchesSinceCheckpoint.hashCode()); result = prime * result + ((parentShardIds == null) ? 0 : parentShardIds.hashCode()); @@ -154,6 +175,11 @@ public class KinesisClientLease extends Lease { return false; } else if (!checkpoint.equals(other.checkpoint)) return false; + if (pendingCheckpoint == null) { + if (other.pendingCheckpoint != null) + return false; + } else if (!pendingCheckpoint.equals(other.pendingCheckpoint)) + return false; if (ownerSwitchesSinceCheckpoint == null) { if (other.ownerSwitchesSinceCheckpoint != null) return false; diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 0fad61ea..2383ff10 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -26,6 +26,7 @@ import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseSerializer; import com.amazonaws.services.kinesis.leases.util.DynamoUtils; +import com.google.common.base.Strings; /** * An implementation of ILeaseSerializer for KinesisClientLease objects. @@ -35,6 +36,8 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer checkpoints = new HashMap<>(); private Map flushpoints = new HashMap<>(); + private Map pendingCheckpoints = new HashMap<>(); private final String startingSequenceNumber; /** @@ -95,6 +96,7 @@ public class InMemoryCheckpointImpl implements ICheckpoint { throws KinesisClientLibException { checkpoints.put(shardId, checkpointValue); flushpoints.put(shardId, checkpointValue); + pendingCheckpoints.remove(shardId); if (LOG.isDebugEnabled()) { LOG.debug("shardId: " + shardId + " checkpoint: " + checkpointValue); @@ -112,6 +114,22 @@ public class InMemoryCheckpointImpl implements ICheckpoint { return checkpoint; } + @Override + public void prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) + throws KinesisClientLibException { + pendingCheckpoints.put(shardId, pendingCheckpoint); + } + + @Override + public Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException { + ExtendedSequenceNumber checkpoint = flushpoints.get(shardId); + ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(shardId); + + Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint); + LOG.debug("getCheckpointObject shardId: " + shardId + ", " + checkpointObj); + return checkpointObj; + } + /** Check that string is neither null nor empty. */ static void verifyNotEmpty(String string, String message) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java new file mode 100644 index 00000000..bfcd7723 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java @@ -0,0 +1,49 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class PreparedCheckpointerTest { + + /** + * This test verifies the relationship between the constructor and getPendingCheckpoint. + */ + @Test + public void testGetSequenceNumber() { + ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn"); + IPreparedCheckpointer checkpointer = new PreparedCheckpointer(sn, null); + Assert.assertEquals(sn, checkpointer.getPendingCheckpoint()); + } + + /** + * This test makes sure the PreparedCheckpointer calls the IRecordProcessorCheckpointer properly. + * + * @throws Exception + */ + @Test + public void testCheckpoint() throws Exception { + ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn"); + IRecordProcessorCheckpointer mockRecordProcessorCheckpointer = Mockito.mock(IRecordProcessorCheckpointer.class); + IPreparedCheckpointer checkpointer = new PreparedCheckpointer(sn, mockRecordProcessorCheckpointer); + checkpointer.checkpoint(); + Mockito.verify(mockRecordProcessorCheckpointer).checkpoint(sn.getSequenceNumber(), sn.getSubSequenceNumber()); + } + + /** + * This test makes sure the PreparedCheckpointer calls the IRecordProcessorCheckpointer properly. + * + * @throws Exception + */ + @Test + public void testDoesNothingPreparedCheckpoint() throws Exception { + ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn"); + IPreparedCheckpointer checkpointer = new DoesNothingPreparedCheckpointer(sn); + Assert.assertEquals(sn, checkpointer.getPendingCheckpoint()); + // nothing happens here + checkpointer.checkpoint(); + } +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java index d5f6b53f..31a1e184 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java @@ -25,12 +25,8 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; @@ -49,6 +45,8 @@ public class RecordProcessorCheckpointerTest { private ExtendedSequenceNumber startingExtendedSequenceNumber = new ExtendedSequenceNumber(startingSequenceNumber); private String testConcurrencyToken = "testToken"; private ICheckpoint checkpoint; + private ShardInfo shardInfo; + private SequenceNumberValidator sequenceNumberValidator; private String shardId = "shardId-123"; /** @@ -60,6 +58,9 @@ public class RecordProcessorCheckpointerTest { // A real checkpoint will return a checkpoint value after it is initialized. checkpoint.setCheckpoint(shardId, startingExtendedSequenceNumber, testConcurrencyToken); Assert.assertEquals(this.startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + + shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); } /** @@ -75,8 +76,6 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpoint() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - // First call to checkpoint RecordProcessorCheckpointer processingCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null); @@ -98,9 +97,6 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointRecord() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - SequenceNumberValidator sequenceNumberValidator = - new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); @@ -113,13 +109,10 @@ public class RecordProcessorCheckpointerTest { /** * Test method for - * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(UserRecord record)}. + * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(Record record)}. */ @Test public final void testCheckpointSubRecord() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - SequenceNumberValidator sequenceNumberValidator = - new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); @@ -137,9 +130,6 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointSequenceNumber() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - SequenceNumberValidator sequenceNumberValidator = - new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); @@ -155,9 +145,6 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointExtendedSequenceNumber() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - SequenceNumberValidator sequenceNumberValidator = - new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); @@ -167,14 +154,210 @@ public class RecordProcessorCheckpointerTest { Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); } + /** + * Test method for + * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint()}. + */ + @Test + public final void testPrepareCheckpoint() throws Exception { + // First call to checkpoint + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + + ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001"); + processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber1); + IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(); + Assert.assertEquals(sequenceNumber1, preparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(sequenceNumber1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // Advance checkpoint + ExtendedSequenceNumber sequenceNumber2 = new ExtendedSequenceNumber("5019"); + + processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber2); + preparedCheckpoint = processingCheckpointer.prepareCheckpoint(); + Assert.assertEquals(sequenceNumber2, preparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(sequenceNumber2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // Checkpoint using preparedCheckpoint + preparedCheckpoint.checkpoint(); + Assert.assertEquals(sequenceNumber2, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(sequenceNumber2, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + } + + /** + * Test method for + * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint(Record record)}. + */ + @Test + public final void testPrepareCheckpointRecord() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); + Record record = new Record().withSequenceNumber("5025"); + processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); + IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(record); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // Checkpoint using preparedCheckpoint + preparedCheckpoint.checkpoint(); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + } + + /** + * Test method for + * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint(Record record)}. + */ + @Test + public final void testPrepareCheckpointSubRecord() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); + Record record = new Record().withSequenceNumber("5030"); + UserRecord subRecord = new UserRecord(record); + processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); + IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(subRecord); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // Checkpoint using preparedCheckpoint + preparedCheckpoint.checkpoint(); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + } + + /** + * Test method for + * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber)}. + */ + @Test + public final void testPrepareCheckpointSequenceNumber() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); + processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); + IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint("5035"); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // Checkpoint using preparedCheckpoint + preparedCheckpoint.checkpoint(); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + } + + /** + * Test method for + * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber, long subSequenceNumber)}. + */ + @Test + public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); + processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); + IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint("5040", 0); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // Checkpoint using preparedCheckpoint + preparedCheckpoint.checkpoint(); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + } + + /** + * Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order. + */ + @Test + public final void testMultipleOutstandingCheckpointersHappyCase() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("6040")); + + ExtendedSequenceNumber sn1 = new ExtendedSequenceNumber("6010"); + IPreparedCheckpointer firstPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("6010", 0); + Assert.assertEquals(sn1, firstPreparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + ExtendedSequenceNumber sn2 = new ExtendedSequenceNumber("6020"); + IPreparedCheckpointer secondPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("6020", 0); + Assert.assertEquals(sn2, secondPreparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // checkpoint in order + firstPreparedCheckpoint.checkpoint(); + Assert.assertEquals(sn1, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + secondPreparedCheckpoint.checkpoint(); + Assert.assertEquals(sn2, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + } + + /** + * Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order. + */ + @Test + public final void testMultipleOutstandingCheckpointersOutOfOrder() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("7040")); + + ExtendedSequenceNumber sn1 = new ExtendedSequenceNumber("7010"); + IPreparedCheckpointer firstPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("7010", 0); + Assert.assertEquals(sn1, firstPreparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + ExtendedSequenceNumber sn2 = new ExtendedSequenceNumber("7020"); + IPreparedCheckpointer secondPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("7020", 0); + Assert.assertEquals(sn2, secondPreparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // checkpoint out of order + secondPreparedCheckpoint.checkpoint(); + Assert.assertEquals(sn2, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + try { + firstPreparedCheckpoint.checkpoint(); + Assert.fail("checkpoint() should have failed because the sequence number was too low"); + } catch (IllegalArgumentException e) { + } catch (Exception e) { + Assert.fail("checkpoint() should have thrown an IllegalArgumentException but instead threw " + e); + } + } + /** * Test method for update() * */ @Test public final void testUpdate() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null); ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10"); @@ -193,8 +376,6 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testClientSpecifiedCheckpoint() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); RecordProcessorCheckpointer processingCheckpointer = @@ -275,10 +456,129 @@ public class RecordProcessorCheckpointerTest { processingCheckpointer.getLastCheckpointValue()); } + /* + * This test is a mixed test of checking some basic functionality of two phase checkpointing at a sequence number + * and making sure certain bounds checks and validations are being performed inside the checkpointer to prevent + * clients from checkpointing out of order/too big/non-numeric values that aren't valid strings for them to be + * checkpointing + */ + @Test + public final void testClientSpecifiedTwoPhaseCheckpoint() throws Exception { + SequenceNumberValidator validator = mock(SequenceNumberValidator.class); + Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + + // Several checkpoints we're gonna hit + ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2"); + ExtendedSequenceNumber firstSequenceNumber = checkpoint.getCheckpoint(shardId); // 13 + ExtendedSequenceNumber secondSequenceNumber = new ExtendedSequenceNumber("127"); + ExtendedSequenceNumber thirdSequenceNumber = new ExtendedSequenceNumber("5019"); + ExtendedSequenceNumber lastSequenceNumberOfShard = new ExtendedSequenceNumber("6789"); + ExtendedSequenceNumber tooBigSequenceNumber = new ExtendedSequenceNumber("9000"); + + processingCheckpointer.setInitialCheckpointValue(firstSequenceNumber); + processingCheckpointer.setLargestPermittedCheckpointValue(thirdSequenceNumber); + + // confirm that we cannot move backward + try { + processingCheckpointer.prepareCheckpoint(tooSmall.getSequenceNumber(), tooSmall.getSubSequenceNumber()); + Assert.fail("You shouldn't be able to prepare a checkpoint earlier than the initial checkpoint."); + } catch (IllegalArgumentException e) { + // yay! + } + + try { + processingCheckpointer.checkpoint(tooSmall.getSequenceNumber(), tooSmall.getSubSequenceNumber()); + Assert.fail("You shouldn't be able to checkpoint earlier than the initial checkpoint."); + } catch (IllegalArgumentException e) { + // yay! + } + + // advance to first + processingCheckpointer.checkpoint(firstSequenceNumber.getSequenceNumber(), firstSequenceNumber.getSubSequenceNumber()); + Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpoint(shardId)); + + // prepare checkpoint at initial checkpoint value + IPreparedCheckpointer doesNothingPreparedCheckpoint = + processingCheckpointer.prepareCheckpoint(firstSequenceNumber.getSequenceNumber(), firstSequenceNumber.getSubSequenceNumber()); + Assert.assertTrue(doesNothingPreparedCheckpoint instanceof DoesNothingPreparedCheckpointer); + Assert.assertEquals(firstSequenceNumber, doesNothingPreparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // nothing happens after checkpointing a doesNothingPreparedCheckpoint + doesNothingPreparedCheckpoint.checkpoint(); + Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // advance to second + processingCheckpointer.prepareCheckpoint(secondSequenceNumber.getSequenceNumber(), secondSequenceNumber.getSubSequenceNumber()); + Assert.assertEquals(secondSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + processingCheckpointer.checkpoint(secondSequenceNumber.getSequenceNumber(), secondSequenceNumber.getSubSequenceNumber()); + Assert.assertEquals(secondSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + ExtendedSequenceNumber[] valuesWeShouldNotBeAbleToCheckpointAt = + { tooSmall, // Shouldn't be able to move before the first value we ever checkpointed + firstSequenceNumber, // Shouldn't even be able to move back to a once used sequence number + tooBigSequenceNumber, // Can't exceed the max sequence number in the checkpointer + lastSequenceNumberOfShard, // Just another big value that we will use later + null, // Not a valid sequence number + new ExtendedSequenceNumber("bogus-checkpoint-value"), // Can't checkpoint at non-numeric string + ExtendedSequenceNumber.SHARD_END, // Can't go to the end unless it is set as the max + ExtendedSequenceNumber.TRIM_HORIZON, // Can't go back to an initial sentinel value + ExtendedSequenceNumber.LATEST // Can't go back to an initial sentinel value + }; + for (ExtendedSequenceNumber badCheckpointValue : valuesWeShouldNotBeAbleToCheckpointAt) { + try { + processingCheckpointer.prepareCheckpoint(badCheckpointValue.getSequenceNumber(), badCheckpointValue.getSubSequenceNumber()); + fail("checkpointing at bad or out of order sequence didn't throw exception"); + } catch (IllegalArgumentException e) { + + } catch (NullPointerException e) { + + } + Assert.assertEquals("Checkpoint value should not have changed", + secondSequenceNumber, + checkpoint.getCheckpoint(shardId)); + Assert.assertEquals("Last checkpoint value should not have changed", + secondSequenceNumber, + processingCheckpointer.getLastCheckpointValue()); + Assert.assertEquals("Largest sequence number should not have changed", + thirdSequenceNumber, + processingCheckpointer.getLargestPermittedCheckpointValue()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + } + + // advance to third number + processingCheckpointer.prepareCheckpoint(thirdSequenceNumber.getSequenceNumber(), thirdSequenceNumber.getSubSequenceNumber()); + Assert.assertEquals(thirdSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + processingCheckpointer.checkpoint(thirdSequenceNumber.getSequenceNumber(), thirdSequenceNumber.getSubSequenceNumber()); + Assert.assertEquals(thirdSequenceNumber, checkpoint.getCheckpoint(shardId)); + + // Testing a feature that prevents checkpointing at SHARD_END twice + processingCheckpointer.setLargestPermittedCheckpointValue(lastSequenceNumberOfShard); + processingCheckpointer.setSequenceNumberAtShardEnd(processingCheckpointer.getLargestPermittedCheckpointValue()); + processingCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + processingCheckpointer.prepareCheckpoint(lastSequenceNumberOfShard.getSequenceNumber(), lastSequenceNumberOfShard.getSubSequenceNumber()); + Assert.assertEquals("Preparing a checkpoing at the sequence number at the end of a shard should be the same as " + + "preparing a checkpoint at SHARD_END", + ExtendedSequenceNumber.SHARD_END, + checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + } + private enum CheckpointAction { NONE, NO_SEQUENCE_NUMBER, WITH_SEQUENCE_NUMBER; } + private enum CheckpointerType { + CHECKPOINTER, PREPARED_CHECKPOINTER, PREPARE_THEN_CHECKPOINTER; + } + /** * Tests a bunch of mixed calls between checkpoint() and checkpoint(sequenceNumber) using a helper function. * @@ -290,16 +590,59 @@ public class RecordProcessorCheckpointerTest { @SuppressWarnings("serial") @Test public final void testMixedCheckpointCalls() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); - RecordProcessorCheckpointer processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.CHECKPOINTER); + } + } - List> testPlans = - new ArrayList>(); + /** + * similar to + * {@link RecordProcessorCheckpointerTest#testMixedCheckpointCalls()} , + * but executes in two phase commit mode, where we prepare a checkpoint and then commit the prepared checkpoint + * + * @throws Exception + */ + @SuppressWarnings("serial") + @Test + public final void testMixedTwoPhaseCheckpointCalls() throws Exception { + SequenceNumberValidator validator = mock(SequenceNumberValidator.class); + Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); + + for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARED_CHECKPOINTER); + } + } + + /** + * similar to + * {@link RecordProcessorCheckpointerTest#testMixedCheckpointCalls()} , + * but executes in two phase commit mode, where we prepare a checkpoint, but we checkpoint using the + * RecordProcessorCheckpointer instead of the returned IPreparedCheckpointer + * + * @throws Exception + */ + @SuppressWarnings("serial") + @Test + public final void testMixedTwoPhaseCheckpointCalls2() throws Exception { + SequenceNumberValidator validator = mock(SequenceNumberValidator.class); + Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); + + for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); + testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARE_THEN_CHECKPOINTER); + } + } + + private List> getMixedCallsTestPlan() { + List> testPlans = new ArrayList>(); /* * Simulate a scenario where the checkpointer is created at "latest". @@ -356,11 +699,7 @@ public class RecordProcessorCheckpointerTest { } }); - for (LinkedHashMap testPlan : testPlans) { - processingCheckpointer = - new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); - testMixedCheckpointCalls(processingCheckpointer, testPlan); - } + return testPlans; } /** @@ -376,9 +715,11 @@ public class RecordProcessorCheckpointerTest { * @throws Exception */ private void testMixedCheckpointCalls(RecordProcessorCheckpointer processingCheckpointer, - LinkedHashMap checkpointValueAndAction) throws Exception { + LinkedHashMap checkpointValueAndAction, + CheckpointerType checkpointerType) throws Exception { for (Entry entry : checkpointValueAndAction.entrySet()) { + IPreparedCheckpointer preparedCheckpoint = null; ExtendedSequenceNumber lastCheckpointValue = processingCheckpointer.getLastCheckpointValue(); if (SentinelCheckpoint.SHARD_END.toString().equals(entry.getKey())) { @@ -400,10 +741,34 @@ public class RecordProcessorCheckpointerTest { processingCheckpointer.getLastCheckpointValue()); continue; case NO_SEQUENCE_NUMBER: - processingCheckpointer.checkpoint(); + switch (checkpointerType) { + case CHECKPOINTER: + processingCheckpointer.checkpoint(); + break; + case PREPARED_CHECKPOINTER: + preparedCheckpoint = processingCheckpointer.prepareCheckpoint(); + preparedCheckpoint.checkpoint(); + case PREPARE_THEN_CHECKPOINTER: + preparedCheckpoint = processingCheckpointer.prepareCheckpoint(); + processingCheckpointer.checkpoint( + preparedCheckpoint.getPendingCheckpoint().getSequenceNumber(), + preparedCheckpoint.getPendingCheckpoint().getSubSequenceNumber()); + } break; case WITH_SEQUENCE_NUMBER: - processingCheckpointer.checkpoint(entry.getKey()); + switch (checkpointerType) { + case CHECKPOINTER: + processingCheckpointer.checkpoint(entry.getKey()); + break; + case PREPARED_CHECKPOINTER: + preparedCheckpoint = processingCheckpointer.prepareCheckpoint(entry.getKey()); + preparedCheckpoint.checkpoint(); + case PREPARE_THEN_CHECKPOINTER: + preparedCheckpoint = processingCheckpointer.prepareCheckpoint(entry.getKey()); + processingCheckpointer.checkpoint( + preparedCheckpoint.getPendingCheckpoint().getSequenceNumber(), + preparedCheckpoint.getPendingCheckpoint().getSubSequenceNumber()); + } break; } // We must have checkpointed to get here, so let's make sure our last checkpoint value is up to date @@ -413,6 +778,11 @@ public class RecordProcessorCheckpointerTest { Assert.assertEquals("Expected the largest checkpoint value to remain the same since the last set", new ExtendedSequenceNumber(entry.getKey()), processingCheckpointer.getLargestPermittedCheckpointValue()); + + Assert.assertEquals(new ExtendedSequenceNumber(entry.getKey()), checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(new ExtendedSequenceNumber(entry.getKey()), + checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); } } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 893f64ed..8073d0df 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -37,6 +38,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.ListIterator; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -45,6 +47,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -52,6 +57,7 @@ import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; @@ -108,6 +114,7 @@ public class ShardConsumerTest { ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); + when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class); when(leaseManager.getLease(anyString())).thenReturn(null); StreamConfig streamConfig = @@ -156,6 +163,7 @@ public class ShardConsumerTest { ExecutorService spyExecutorService = spy(executorService); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); + when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class); when(leaseManager.getLease(anyString())).thenReturn(null); StreamConfig streamConfig = new StreamConfig(streamProxy, @@ -218,8 +226,11 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); + final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; when(leaseManager.getLease(anyString())).thenReturn(null); - when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); + when(checkpoint.getCheckpointObject(anyString())).thenReturn( + new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask @@ -233,7 +244,8 @@ public class ShardConsumerTest { consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); - verify(processor, times(1)).initialize(any(InitializationInput.class)); + verify(processor, times(1)).initialize(argThat( + initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); try { // Checking the status of submitted InitializeTask from above should throw exception. @@ -244,14 +256,17 @@ public class ShardConsumerTest { } Thread.sleep(50L); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); - verify(processor, times(1)).initialize(any(InitializationInput.class)); + verify(processor, times(1)).initialize(argThat( + initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); doNothing().when(processor).initialize(any(InitializationInput.class)); consumer.consumeShard(); // submit InitializeTask again. Thread.sleep(50L); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); - verify(processor, times(2)).initialize(any(InitializationInput.class)); + verify(processor, times(2)).initialize(argThat( + initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); + verify(processor, times(2)).initialize(any(InitializationInput.class)); // no other calls with different args // Checking the status of submitted InitializeTask from above should pass. consumer.consumeShard(); @@ -447,6 +462,54 @@ public class ShardConsumerTest { file.delete(); } + @SuppressWarnings("unchecked") + @Test + public final void testConsumeShardInitializedWithPendingCheckpoint() throws Exception { + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); + StreamConfig streamConfig = + new StreamConfig(streamProxy, + 1, + 10, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardConsumer consumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + null, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + + final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); + final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); + when(leaseManager.getLease(anyString())).thenReturn(null); + when(checkpoint.getCheckpointObject(anyString())).thenReturn( + new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); + + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + consumer.consumeShard(); // submit BlockOnParentShardTask + Thread.sleep(50L); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + verify(processor, times(0)).initialize(any(InitializationInput.class)); + + consumer.consumeShard(); // submit InitializeTask + Thread.sleep(50L); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + verify(processor, times(1)).initialize(argThat( + initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); + verify(processor, times(1)).initialize(any(InitializationInput.class)); // no other calls with different args + + consumer.consumeShard(); + Thread.sleep(50L); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + } + //@formatter:off (gets the formatting wrong) private void verifyConsumedRecords(List expectedRecords, List actualRecords) { @@ -469,4 +532,21 @@ public class ShardConsumerTest { } return userRecords; } + + Matcher initializationInputMatcher(final ExtendedSequenceNumber checkpoint, + final ExtendedSequenceNumber pendingCheckpoint) { + return new TypeSafeMatcher() { + @Override + protected boolean matchesSafely(InitializationInput item) { + return Objects.equals(checkpoint, item.getExtendedSequenceNumber()) + && Objects.equals(pendingCheckpoint, item.getPendingCheckpointSequenceNumber()); + } + + @Override + public void describeTo(Description description) { + description.appendText(String.format("Checkpoint should be %s and pending checkpoint should be %s", + checkpoint, pendingCheckpoint)); + } + }; + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java index 90a1676d..2e8879fe 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java @@ -27,6 +27,7 @@ public class KinesisClientLeaseBuilder { private UUID concurrencyToken; private Long lastCounterIncrementNanos; private ExtendedSequenceNumber checkpoint; + private ExtendedSequenceNumber pendingCheckpoint; private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); @@ -60,6 +61,11 @@ public class KinesisClientLeaseBuilder { return this; } + public KinesisClientLeaseBuilder withPendingCheckpoint(ExtendedSequenceNumber pendingCheckpoint) { + this.pendingCheckpoint = pendingCheckpoint; + return this; + } + public KinesisClientLeaseBuilder withOwnerSwitchesSinceCheckpoint(Long ownerSwitchesSinceCheckpoint) { this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint; return this; @@ -72,6 +78,6 @@ public class KinesisClientLeaseBuilder { public KinesisClientLease build() { return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, - checkpoint, ownerSwitchesSinceCheckpoint, parentShardIds); + checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds); } } \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index d27b9480..d2659349 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -18,6 +18,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateExcep import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; @@ -99,6 +100,34 @@ public class StreamingRecordProcessorTest { IllegalArgumentException { throw new UnsupportedOperationException(); } + + @Override + public IPreparedCheckpointer prepareCheckpoint() + throws KinesisClientLibDependencyException, + InvalidStateException, ThrottlingException, ShutdownException { + throw new UnsupportedOperationException(); + } + + @Override + public IPreparedCheckpointer prepareCheckpoint(Record record) + throws KinesisClientLibDependencyException, + InvalidStateException, ThrottlingException, ShutdownException { + throw new UnsupportedOperationException(); + } + + @Override + public IPreparedCheckpointer prepareCheckpoint(String sequenceNumber) + throws KinesisClientLibDependencyException, + InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + throw new UnsupportedOperationException(); + } + + @Override + public IPreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) + throws KinesisClientLibDependencyException, + InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + throw new UnsupportedOperationException(); + } }; private MessageWriter messageWriter;