Add Support for Two Phase Commit (#188)

Adds support for making two phase commits.  When a checkpoint is started, but fails to complete the next instance of the record processor will receive the attempted checkpoint position at initialization time.
This commit is contained in:
Walid Baruni 2017-08-15 12:56:32 -07:00 committed by Justin Pfifer
parent cdbbff31e8
commit 1ec0b656c9
19 changed files with 1247 additions and 49 deletions

View file

@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.interfaces; package com.amazonaws.services.kinesis.clientlibrary.interfaces;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
/** /**
@ -46,4 +47,30 @@ public interface ICheckpoint {
*/ */
ExtendedSequenceNumber getCheckpoint(String shardId) throws KinesisClientLibException; ExtendedSequenceNumber getCheckpoint(String shardId) throws KinesisClientLibException;
/**
* Get the current checkpoint stored for the specified shard, which holds the sequence numbers for the checkpoint
* and pending checkpoint. Useful for checking that the parent shard has been completely processed before we start
* processing the child shard.
*
* @param shardId Current checkpoint for this shard is fetched
* @return Current checkpoint object for this shard, null if there is no record for this shard.
* @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint
*/
Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException;
/**
* Record intent to checkpoint for a shard. Upon failover, the pendingCheckpointValue will be passed to the new
* RecordProcessor's initialize() method.
*
* @param shardId Checkpoint is specified for this shard.
* @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number)
* @param concurrencyToken Used with conditional writes to prevent stale updates
* (e.g. if there was a fail over to a different record processor, we don't want to
* overwrite it's checkpoint)
* @throws KinesisClientLibException Thrown if we were unable to save the checkpoint
*/
void prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException;
} }

View file

@ -0,0 +1,41 @@
package com.amazonaws.services.kinesis.clientlibrary.interfaces;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
/**
* Objects of this class are prepared to checkpoint at a specific sequence number. They use an
* IRecordProcessorCheckpointer to do the actual checkpointing, so their checkpoint is subject to the same 'didn't go
* backwards' validation as a normal checkpoint.
*/
public interface IPreparedCheckpointer {
/**
* @return sequence number of pending checkpoint
*/
ExtendedSequenceNumber getPendingCheckpoint();
/**
* This method will record a pending checkpoint.
*
* @throws ThrottlingException Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @throws InvalidStateException Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
* @throws IllegalArgumentException The sequence number being checkpointed is invalid because it is out of range,
* i.e. it is smaller than the last check point value (prepared or committed), or larger than the greatest
* sequence number seen by the associated record processor.
*/
void checkpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
}

View file

@ -120,4 +120,111 @@ public interface IRecordProcessorCheckpointer {
void checkpoint(String sequenceNumber, long subSequenceNumber) void checkpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException; IllegalArgumentException;
/**
* This method will record a pending checkpoint at the last data record that was delivered to the record processor.
* If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next
* IRecordProcessor for this shard will be informed of the prepared sequence number
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number passed in to init() to behave idempotently.
*
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
*/
IPreparedCheckpointer prepareCheckpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the at the provided record. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint.
*
* @param record A record at which to prepare checkpoint in this shard.
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number and application state passed in to init() to behave idempotently.
*
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
IPreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint.
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
IPreparedCheckpointer prepareCheckpoint(String sequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
* but provides the ability to specify the sequence number at which to checkpoint
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard.
*
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
IPreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
} }

View file

@ -0,0 +1,27 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import lombok.Data;
/**
* A class encapsulating the 2 pieces of state stored in a checkpoint.
*/
@Data public class Checkpoint {
private final ExtendedSequenceNumber checkpoint;
private final ExtendedSequenceNumber pendingCheckpoint;
/**
* Constructor.
*
* @param checkpoint the checkpoint sequence number - cannot be null or empty.
* @param pendingCheckpoint the pending checkpoint sequence number - can be null.
*/
public Checkpoint(ExtendedSequenceNumber checkpoint, ExtendedSequenceNumber pendingCheckpoint) {
if (checkpoint == null || checkpoint.getSequenceNumber().isEmpty()) {
throw new IllegalArgumentException("Checkpoint cannot be null or empty");
}
this.checkpoint = checkpoint;
this.pendingCheckpoint = pendingCheckpoint;
}
}

View file

@ -0,0 +1,52 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
/**
* A special IPreparedCheckpointer that does nothing, which can be used when preparing a checkpoint at the current
* checkpoint sequence number where it is never necessary to do another checkpoint.
* This simplifies programming by preventing application developers from having to reason about whether
* their application has processed records before calling prepareCheckpoint
*
* Here's why it's safe to do nothing:
* The only way to checkpoint at current checkpoint value is to have a record processor that gets
* initialized, processes 0 records, then calls prepareCheckpoint(). The value in the table is the same, so there's
* no reason to overwrite it with another copy of itself.
*/
public class DoesNothingPreparedCheckpointer implements IPreparedCheckpointer {
private final ExtendedSequenceNumber sequenceNumber;
/**
* Constructor.
* @param sequenceNumber the sequence number value
*/
public DoesNothingPreparedCheckpointer(ExtendedSequenceNumber sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
/**
* {@inheritDoc}
*/
@Override
public ExtendedSequenceNumber getPendingCheckpoint() {
return sequenceNumber;
}
/**
* {@inheritDoc}
*/
@Override
public void checkpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException {
// This method does nothing
}
}

View file

@ -19,6 +19,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
@ -75,7 +76,8 @@ class InitializeTask implements ITask {
try { try {
LOG.debug("Initializing ShardId " + shardInfo.getShardId()); LOG.debug("Initializing ShardId " + shardInfo.getShardId());
ExtendedSequenceNumber initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId()); Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId());
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint();
dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
@ -84,7 +86,8 @@ class InitializeTask implements ITask {
LOG.debug("Calling the record processor initialize()."); LOG.debug("Calling the record processor initialize().");
final InitializationInput initializationInput = new InitializationInput() final InitializationInput initializationInput = new InitializationInput()
.withShardId(shardInfo.getShardId()) .withShardId(shardInfo.getShardId())
.withExtendedSequenceNumber(initialCheckpoint); .withExtendedSequenceNumber(initialCheckpoint)
.withPendingCheckpointSequenceNumber(initialCheckpointObject.getPendingCheckpoint());
final long recordProcessorStartTimeMillis = System.currentTimeMillis(); final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try { try {
recordProcessor.initialize(initializationInput); recordProcessor.initialize(initializationInput);

View file

@ -18,6 +18,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
@ -30,6 +31,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
@ -157,6 +159,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
} }
lease.setCheckpoint(checkpoint); lease.setCheckpoint(checkpoint);
lease.setPendingCheckpoint(null);
lease.setOwnerSwitchesSinceCheckpoint(0L); lease.setOwnerSwitchesSinceCheckpoint(0L);
return updateLease(lease, concurrencyToken); return updateLease(lease, concurrencyToken);
@ -198,6 +201,75 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
} }
} }
/**
* Records pending checkpoint for a shard. Does not modify checkpoint or ownerSwitchesSinceCheckpoint.
*
* @param shardId shardId to update the checkpoint for
* @param pendingCheckpoint pending checkpoint value to set, not null
* @param concurrencyToken obtained by calling Lease.getConcurrencyToken for a currently held lease
*
* @return true if setting the pending checkpoint succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
boolean prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease lease = getCurrentlyHeldLease(shardId);
if (lease == null) {
LOG.info(String.format(
"Worker %s could not prepare checkpoint for shard %s because it does not hold the lease",
getWorkerIdentifier(),
shardId));
return false;
}
lease.setPendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null"));
return updateLease(lease, concurrencyToken);
}
/**
* {@inheritDoc}
*/
@Override
public void prepareCheckpoint(String shardId,
ExtendedSequenceNumber pendingCheckpointValue,
String concurrencyToken) throws KinesisClientLibException {
try {
boolean wasSuccessful =
prepareCheckpoint(shardId, pendingCheckpointValue, UUID.fromString(concurrencyToken));
if (!wasSuccessful) {
throw new ShutdownException(
"Can't prepare checkpoint - instance doesn't hold the lease for this shard");
}
} catch (ProvisionedThroughputException e) {
throw new ThrottlingException("Got throttled while preparing checkpoint.", e);
} catch (InvalidStateException e) {
String message = "Unable to prepare checkpoint for shardId " + shardId;
LOG.error(message, e);
throw new com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException(message, e);
} catch (DependencyException e) {
throw new KinesisClientLibDependencyException("Unable to prepare checkpoint for shardId " + shardId, e);
}
}
/**
* {@inheritDoc}
*/
@Override
public Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException {
try {
KinesisClientLease lease = leaseManager.getLease(shardId);
return new Checkpoint(lease.getCheckpoint(), lease.getPendingCheckpoint());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
String message = "Unable to fetch checkpoint for shardId " + shardId;
LOG.error(message, e);
throw new KinesisClientLibIOException(message, e);
}
}
/** /**
* @return Current shard/lease assignments * @return Current shard/lease assignments
*/ */

View file

@ -0,0 +1,51 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
/**
* Objects of this class are prepared to checkpoint at a specific sequence number. They use an
* IRecordProcessorCheckpointer to do the actual checkpointing, so their checkpoint is subject to the same 'didn't go
* backwards' validation as a normal checkpoint.
*/
public class PreparedCheckpointer implements IPreparedCheckpointer {
private final ExtendedSequenceNumber pendingCheckpointSequenceNumber;
private final IRecordProcessorCheckpointer checkpointer;
/**
* Constructor.
*
* @param pendingCheckpointSequenceNumber sequence number to checkpoint at
* @param checkpointer checkpointer to use
*/
public PreparedCheckpointer(ExtendedSequenceNumber pendingCheckpointSequenceNumber,
IRecordProcessorCheckpointer checkpointer) {
this.pendingCheckpointSequenceNumber = pendingCheckpointSequenceNumber;
this.checkpointer = checkpointer;
}
/**
* {@inheritDoc}
*/
@Override
public ExtendedSequenceNumber getPendingCheckpoint() {
return pendingCheckpointSequenceNumber;
}
/**
* {@inheritDoc}
*/
@Override
public void checkpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException {
checkpointer.checkpoint(pendingCheckpointSequenceNumber.getSequenceNumber(),
pendingCheckpointSequenceNumber.getSubSequenceNumber());
}
}

View file

@ -23,6 +23,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibE
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
@ -127,7 +128,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
* If there is a last checkpoint value, we want to check both the lower and upper bound. * If there is a last checkpoint value, we want to check both the lower and upper bound.
*/ */
ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber(sequenceNumber, subSequenceNumber); ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber(sequenceNumber, subSequenceNumber);
if ((lastCheckpointValue.compareTo(newCheckpoint) <= 0) if ((lastCheckpointValue == null || lastCheckpointValue.compareTo(newCheckpoint) <= 0)
&& newCheckpoint.compareTo(largestPermittedCheckpointValue) <= 0) { && newCheckpoint.compareTo(largestPermittedCheckpointValue) <= 0) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -144,6 +145,82 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
} }
} }
/**
* {@inheritDoc}
*/
@Override
public synchronized IPreparedCheckpointer prepareCheckpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return this.prepareCheckpoint(
this.largestPermittedCheckpointValue.getSequenceNumber(),
this.largestPermittedCheckpointValue.getSubSequenceNumber());
}
/**
* {@inheritDoc}
*/
@Override
public synchronized IPreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
if (record == null) {
throw new IllegalArgumentException("Could not prepare checkpoint a null record");
} else if (record instanceof UserRecord) {
return prepareCheckpoint(record.getSequenceNumber(), ((UserRecord) record).getSubSequenceNumber());
} else {
return prepareCheckpoint(record.getSequenceNumber(), 0);
}
}
/**
* {@inheritDoc}
*/
@Override
public synchronized IPreparedCheckpointer prepareCheckpoint(String sequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(sequenceNumber, 0);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized IPreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
if (subSequenceNumber < 0) {
throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number "
+ subSequenceNumber);
}
// throws exception if sequence number shouldn't be checkpointed for this shard
sequenceNumberValidator.validateSequenceNumber(sequenceNumber);
if (LOG.isDebugEnabled()) {
LOG.debug("Validated prepareCheckpoint sequence number " + sequenceNumber + " for " + shardInfo.getShardId()
+ ", token " + shardInfo.getConcurrencyToken());
}
/*
* If there isn't a last checkpoint value, we only care about checking the upper bound.
* If there is a last checkpoint value, we want to check both the lower and upper bound.
*/
ExtendedSequenceNumber pendingCheckpoint = new ExtendedSequenceNumber(sequenceNumber, subSequenceNumber);
if ((lastCheckpointValue == null || lastCheckpointValue.compareTo(pendingCheckpoint) <= 0)
&& pendingCheckpoint.compareTo(largestPermittedCheckpointValue) <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Preparing checkpoint " + shardInfo.getShardId()
+ ", token " + shardInfo.getConcurrencyToken()
+ " at specific extended sequence number " + pendingCheckpoint);
}
return doPrepareCheckpoint(pendingCheckpoint);
} else {
throw new IllegalArgumentException(String.format(
"Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable "
+ "range between the last checkpoint %s and the greatest extended sequence number passed "
+ "to this record processor %s",
pendingCheckpoint, this.lastCheckpointValue, this.largestPermittedCheckpointValue));
}
}
/** /**
* @return the lastCheckpointValue * @return the lastCheckpointValue
*/ */
@ -165,7 +242,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
} }
/** /**
* @param checkpoint the checkpoint value to set * @param largestPermittedCheckpointValue the largest permitted checkpoint
*/ */
synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) {
this.largestPermittedCheckpointValue = largestPermittedCheckpointValue; this.largestPermittedCheckpointValue = largestPermittedCheckpointValue;
@ -224,4 +301,58 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
} }
} }
} }
/**
* This method stores the given sequenceNumber as a pending checkpooint in the lease table without overwriting the
* current checkpoint, then returns a PreparedCheckpointer that is ready to checkpoint at the given sequence number.
*
* This method does not advance lastCheckpointValue, but calls to PreparedCheckpointer.checkpoint() on the returned
* objects do. This allows customers to 'discard' prepared checkpoints by calling any of the 4 checkpoint methods on
* this class before calling PreparedCheckpointer.checkpoint(). Some examples:
*
* 1) prepareCheckpoint(snA); checkpoint(snB). // this works regardless of whether snA or snB is bigger. It discards
* the prepared checkpoint at snA.
* 2) prepareCheckpoint(snA); prepareCheckpoint(snB). // this works regardless of whether snA or snB is bigger. It
* replaces the preparedCheckpoint at snA with a new one at snB.
* 3) checkpointerA = prepareCheckpoint(snA); checkpointerB = prepareCheckpoint(snB); checkpointerB.checkpoint();
* checkpointerA.checkpoint(); // This replaces the prepared checkpoint at snA with a new one at snB, then
* checkpoints at snB regardless of whether snA or snB is bigger. The checkpoint at snA only succeeds if snA > snB.
*
* @param extendedSequenceNumber the sequence number for the prepared checkpoint
* @return a prepared checkpointer that is ready to checkpoint at the given sequence number.
* @throws KinesisClientLibDependencyException
* @throws InvalidStateException
* @throws ThrottlingException
* @throws ShutdownException
*/
private IPreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber;
if (sequenceNumberAtShardEnd != null && sequenceNumberAtShardEnd.equals(extendedSequenceNumber)) {
// If we are about to checkpoint the very last sequence number for this shard, we might as well
// just checkpoint at SHARD_END
newPrepareCheckpoint = ExtendedSequenceNumber.SHARD_END;
}
// Don't actually prepare a checkpoint if they're trying to checkpoint at the current checkpointed value.
// The only way this can happen is if they call prepareCheckpoint() in a record processor that was initialized
// AND that has not processed any records since initialization.
if (newPrepareCheckpoint.equals(lastCheckpointValue)) {
return new DoesNothingPreparedCheckpointer(newPrepareCheckpoint);
}
try {
checkpoint.prepareCheckpoint(shardInfo.getShardId(), newPrepareCheckpoint, shardInfo.getConcurrencyToken());
} catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) {
throw e;
} catch (KinesisClientLibException e) {
LOG.warn("Caught exception setting prepareCheckpoint.", e);
throw new KinesisClientLibDependencyException("Caught exception while prepareCheckpointing", e);
}
PreparedCheckpointer result = new PreparedCheckpointer(newPrepareCheckpoint, this);
return result;
}
} }

View file

@ -23,6 +23,7 @@ public class InitializationInput {
private String shardId; private String shardId;
private ExtendedSequenceNumber extendedSequenceNumber; private ExtendedSequenceNumber extendedSequenceNumber;
private ExtendedSequenceNumber pendingCheckpointSequenceNumber;
/** /**
* Default constructor. * Default constructor.
@ -71,4 +72,26 @@ public class InitializationInput {
this.extendedSequenceNumber = extendedSequenceNumber; this.extendedSequenceNumber = extendedSequenceNumber;
return this; return this;
} }
/**
* Get pending checkpoint {@link ExtendedSequenceNumber}.
*
* @return The {@link ExtendedSequenceNumber} in the shard for which a checkpoint is pending
*/
public ExtendedSequenceNumber getPendingCheckpointSequenceNumber() {
return pendingCheckpointSequenceNumber;
}
/**
* Set pending checkpoint {@link ExtendedSequenceNumber}.
*
* @param pendingCheckpointSequenceNumber The {@link ExtendedSequenceNumber} in the shard for which a checkpoint
* is pending
* @return A reference to this updated object so that method calls can be chained together.
*/
public InitializationInput withPendingCheckpointSequenceNumber(
ExtendedSequenceNumber pendingCheckpointSequenceNumber) {
this.pendingCheckpointSequenceNumber = pendingCheckpointSequenceNumber;
return this;
}
} }

View file

@ -27,6 +27,7 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
public class KinesisClientLease extends Lease { public class KinesisClientLease extends Lease {
private ExtendedSequenceNumber checkpoint; private ExtendedSequenceNumber checkpoint;
private ExtendedSequenceNumber pendingCheckpoint;
private Long ownerSwitchesSinceCheckpoint = 0L; private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<String>(); private Set<String> parentShardIds = new HashSet<String>();
@ -37,16 +38,18 @@ public class KinesisClientLease extends Lease {
public KinesisClientLease(KinesisClientLease other) { public KinesisClientLease(KinesisClientLease other) {
super(other); super(other);
this.checkpoint = other.getCheckpoint(); this.checkpoint = other.getCheckpoint();
this.pendingCheckpoint = other.getPendingCheckpoint();
this.ownerSwitchesSinceCheckpoint = other.getOwnerSwitchesSinceCheckpoint(); this.ownerSwitchesSinceCheckpoint = other.getOwnerSwitchesSinceCheckpoint();
this.parentShardIds.addAll(other.getParentShardIds()); this.parentShardIds.addAll(other.getParentShardIds());
} }
KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken, KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken,
Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, Long ownerSwitchesSinceCheckpoint, Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, ExtendedSequenceNumber pendingCheckpoint,
Set<String> parentShardIds) { Long ownerSwitchesSinceCheckpoint, Set<String> parentShardIds) {
super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos); super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos);
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.pendingCheckpoint = pendingCheckpoint;
this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint; this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint;
this.parentShardIds.addAll(parentShardIds); this.parentShardIds.addAll(parentShardIds);
} }
@ -64,6 +67,7 @@ public class KinesisClientLease extends Lease {
setOwnerSwitchesSinceCheckpoint(casted.ownerSwitchesSinceCheckpoint); setOwnerSwitchesSinceCheckpoint(casted.ownerSwitchesSinceCheckpoint);
setCheckpoint(casted.checkpoint); setCheckpoint(casted.checkpoint);
setPendingCheckpoint(casted.pendingCheckpoint);
setParentShardIds(casted.parentShardIds); setParentShardIds(casted.parentShardIds);
} }
@ -75,6 +79,13 @@ public class KinesisClientLease extends Lease {
return checkpoint; return checkpoint;
} }
/**
* @return pending checkpoint, possibly null.
*/
public ExtendedSequenceNumber getPendingCheckpoint() {
return pendingCheckpoint;
}
/** /**
* @return count of distinct lease holders between checkpoints. * @return count of distinct lease holders between checkpoints.
*/ */
@ -100,6 +111,15 @@ public class KinesisClientLease extends Lease {
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
} }
/**
* Sets pending checkpoint.
*
* @param pendingCheckpoint can be null
*/
public void setPendingCheckpoint(ExtendedSequenceNumber pendingCheckpoint) {
this.pendingCheckpoint = pendingCheckpoint;
}
/** /**
* Sets ownerSwitchesSinceCheckpoint. * Sets ownerSwitchesSinceCheckpoint.
* *
@ -134,6 +154,7 @@ public class KinesisClientLease extends Lease {
final int prime = 31; final int prime = 31;
int result = super.hashCode(); int result = super.hashCode();
result = prime * result + ((checkpoint == null) ? 0 : checkpoint.hashCode()); result = prime * result + ((checkpoint == null) ? 0 : checkpoint.hashCode());
result = pendingCheckpoint == null ? result : prime * result + pendingCheckpoint.hashCode();
result = result =
prime * result + ((ownerSwitchesSinceCheckpoint == null) ? 0 : ownerSwitchesSinceCheckpoint.hashCode()); prime * result + ((ownerSwitchesSinceCheckpoint == null) ? 0 : ownerSwitchesSinceCheckpoint.hashCode());
result = prime * result + ((parentShardIds == null) ? 0 : parentShardIds.hashCode()); result = prime * result + ((parentShardIds == null) ? 0 : parentShardIds.hashCode());
@ -154,6 +175,11 @@ public class KinesisClientLease extends Lease {
return false; return false;
} else if (!checkpoint.equals(other.checkpoint)) } else if (!checkpoint.equals(other.checkpoint))
return false; return false;
if (pendingCheckpoint == null) {
if (other.pendingCheckpoint != null)
return false;
} else if (!pendingCheckpoint.equals(other.pendingCheckpoint))
return false;
if (ownerSwitchesSinceCheckpoint == null) { if (ownerSwitchesSinceCheckpoint == null) {
if (other.ownerSwitchesSinceCheckpoint != null) if (other.ownerSwitchesSinceCheckpoint != null)
return false; return false;

View file

@ -26,6 +26,7 @@ import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseSerializer; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseSerializer;
import com.amazonaws.services.kinesis.leases.util.DynamoUtils; import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
import com.google.common.base.Strings;
/** /**
* An implementation of ILeaseSerializer for KinesisClientLease objects. * An implementation of ILeaseSerializer for KinesisClientLease objects.
@ -35,6 +36,8 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
private static final String OWNER_SWITCHES_KEY = "ownerSwitchesSinceCheckpoint"; private static final String OWNER_SWITCHES_KEY = "ownerSwitchesSinceCheckpoint";
private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint"; private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint";
private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber"; private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint";
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
public final String PARENT_SHARD_ID_KEY = "parentShardId"; public final String PARENT_SHARD_ID_KEY = "parentShardId";
private final LeaseSerializer baseSerializer = new LeaseSerializer(KinesisClientLease.class); private final LeaseSerializer baseSerializer = new LeaseSerializer(KinesisClientLease.class);
@ -50,6 +53,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.getParentShardIds())); result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.getParentShardIds()));
} }
if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()));
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber()));
}
return result; return result;
} }
@ -65,6 +73,14 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
); );
result.setParentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY)); result.setParentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) {
result.setPendingCheckpoint(
new ExtendedSequenceNumber(
DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY),
DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY))
);
}
return result; return result;
} }
@ -128,6 +144,14 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getOwnerSwitchesSinceCheckpoint()), new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getOwnerSwitchesSinceCheckpoint()),
AttributeAction.PUT)); AttributeAction.PUT));
if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()), AttributeAction.PUT));
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber()), AttributeAction.PUT));
} else {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, new AttributeValueUpdate().withAction(AttributeAction.DELETE));
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, new AttributeValueUpdate().withAction(AttributeAction.DELETE));
}
return result; return result;
} }

View file

@ -107,6 +107,68 @@ public abstract class CheckpointImplTestBase {
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(checkpointValue); ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(checkpointValue);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), concurrencyToken); checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), concurrencyToken);
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
} }
@Test
public final void testInitialPrepareCheckpoint() throws Exception {
String sequenceNumber = "1";
String pendingCheckpointValue = "99999";
String shardId = "myShardId";
ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(sequenceNumber);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken);
ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue);
checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), testConcurrencyToken);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
@Test
public final void testAdvancingPrepareCheckpoint() throws Exception {
String shardId = "myShardId";
String checkpointValue = "12345";
ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), testConcurrencyToken);
for (Integer i = 0; i < 10; i++) {
String sequenceNumber = i.toString();
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber);
checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
}
@Test
public final void testPrepareAndSetCheckpoint() throws Exception {
String checkpointValue = "12345";
String shardId = "testShardId-1";
String concurrencyToken = "token-1";
String pendingCheckpointValue = "99999";
// set initial checkpoint
ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), concurrencyToken);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// prepare checkpoint
ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue);
checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// do checkpoint
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken);
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
} }

View file

@ -33,6 +33,7 @@ public class InMemoryCheckpointImpl implements ICheckpoint {
private Map<String, ExtendedSequenceNumber> checkpoints = new HashMap<>(); private Map<String, ExtendedSequenceNumber> checkpoints = new HashMap<>();
private Map<String, ExtendedSequenceNumber> flushpoints = new HashMap<>(); private Map<String, ExtendedSequenceNumber> flushpoints = new HashMap<>();
private Map<String, ExtendedSequenceNumber> pendingCheckpoints = new HashMap<>();
private final String startingSequenceNumber; private final String startingSequenceNumber;
/** /**
@ -95,6 +96,7 @@ public class InMemoryCheckpointImpl implements ICheckpoint {
throws KinesisClientLibException { throws KinesisClientLibException {
checkpoints.put(shardId, checkpointValue); checkpoints.put(shardId, checkpointValue);
flushpoints.put(shardId, checkpointValue); flushpoints.put(shardId, checkpointValue);
pendingCheckpoints.remove(shardId);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("shardId: " + shardId + " checkpoint: " + checkpointValue); LOG.debug("shardId: " + shardId + " checkpoint: " + checkpointValue);
@ -112,6 +114,22 @@ public class InMemoryCheckpointImpl implements ICheckpoint {
return checkpoint; return checkpoint;
} }
@Override
public void prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException {
pendingCheckpoints.put(shardId, pendingCheckpoint);
}
@Override
public Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException {
ExtendedSequenceNumber checkpoint = flushpoints.get(shardId);
ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(shardId);
Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint);
LOG.debug("getCheckpointObject shardId: " + shardId + ", " + checkpointObj);
return checkpointObj;
}
/** Check that string is neither null nor empty. /** Check that string is neither null nor empty.
*/ */
static void verifyNotEmpty(String string, String message) { static void verifyNotEmpty(String string, String message) {

View file

@ -0,0 +1,49 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class PreparedCheckpointerTest {
/**
* This test verifies the relationship between the constructor and getPendingCheckpoint.
*/
@Test
public void testGetSequenceNumber() {
ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn");
IPreparedCheckpointer checkpointer = new PreparedCheckpointer(sn, null);
Assert.assertEquals(sn, checkpointer.getPendingCheckpoint());
}
/**
* This test makes sure the PreparedCheckpointer calls the IRecordProcessorCheckpointer properly.
*
* @throws Exception
*/
@Test
public void testCheckpoint() throws Exception {
ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn");
IRecordProcessorCheckpointer mockRecordProcessorCheckpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
IPreparedCheckpointer checkpointer = new PreparedCheckpointer(sn, mockRecordProcessorCheckpointer);
checkpointer.checkpoint();
Mockito.verify(mockRecordProcessorCheckpointer).checkpoint(sn.getSequenceNumber(), sn.getSubSequenceNumber());
}
/**
* This test makes sure the PreparedCheckpointer calls the IRecordProcessorCheckpointer properly.
*
* @throws Exception
*/
@Test
public void testDoesNothingPreparedCheckpoint() throws Exception {
ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn");
IPreparedCheckpointer checkpointer = new DoesNothingPreparedCheckpointer(sn);
Assert.assertEquals(sn, checkpointer.getPendingCheckpoint());
// nothing happens here
checkpointer.checkpoint();
}
}

View file

@ -25,12 +25,8 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
@ -49,6 +45,8 @@ public class RecordProcessorCheckpointerTest {
private ExtendedSequenceNumber startingExtendedSequenceNumber = new ExtendedSequenceNumber(startingSequenceNumber); private ExtendedSequenceNumber startingExtendedSequenceNumber = new ExtendedSequenceNumber(startingSequenceNumber);
private String testConcurrencyToken = "testToken"; private String testConcurrencyToken = "testToken";
private ICheckpoint checkpoint; private ICheckpoint checkpoint;
private ShardInfo shardInfo;
private SequenceNumberValidator sequenceNumberValidator;
private String shardId = "shardId-123"; private String shardId = "shardId-123";
/** /**
@ -60,6 +58,9 @@ public class RecordProcessorCheckpointerTest {
// A real checkpoint will return a checkpoint value after it is initialized. // A real checkpoint will return a checkpoint value after it is initialized.
checkpoint.setCheckpoint(shardId, startingExtendedSequenceNumber, testConcurrencyToken); checkpoint.setCheckpoint(shardId, startingExtendedSequenceNumber, testConcurrencyToken);
Assert.assertEquals(this.startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(this.startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false);
} }
/** /**
@ -75,8 +76,6 @@ public class RecordProcessorCheckpointerTest {
*/ */
@Test @Test
public final void testCheckpoint() throws Exception { public final void testCheckpoint() throws Exception {
ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
// First call to checkpoint // First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, null); new RecordProcessorCheckpointer(shardInfo, checkpoint, null);
@ -98,9 +97,6 @@ public class RecordProcessorCheckpointerTest {
*/ */
@Test @Test
public final void testCheckpointRecord() throws Exception { public final void testCheckpointRecord() throws Exception {
ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
SequenceNumberValidator sequenceNumberValidator =
new SequenceNumberValidator(null, shardId, false);
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
@ -113,13 +109,10 @@ public class RecordProcessorCheckpointerTest {
/** /**
* Test method for * Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(UserRecord record)}. * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(Record record)}.
*/ */
@Test @Test
public final void testCheckpointSubRecord() throws Exception { public final void testCheckpointSubRecord() throws Exception {
ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
SequenceNumberValidator sequenceNumberValidator =
new SequenceNumberValidator(null, shardId, false);
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
@ -137,9 +130,6 @@ public class RecordProcessorCheckpointerTest {
*/ */
@Test @Test
public final void testCheckpointSequenceNumber() throws Exception { public final void testCheckpointSequenceNumber() throws Exception {
ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
SequenceNumberValidator sequenceNumberValidator =
new SequenceNumberValidator(null, shardId, false);
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
@ -155,9 +145,6 @@ public class RecordProcessorCheckpointerTest {
*/ */
@Test @Test
public final void testCheckpointExtendedSequenceNumber() throws Exception { public final void testCheckpointExtendedSequenceNumber() throws Exception {
ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
SequenceNumberValidator sequenceNumberValidator =
new SequenceNumberValidator(null, shardId, false);
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator); new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
@ -167,14 +154,210 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
} }
/**
* Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint()}.
*/
@Test
public final void testPrepareCheckpoint() throws Exception {
// First call to checkpoint
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber1);
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint();
Assert.assertEquals(sequenceNumber1, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sequenceNumber1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Advance checkpoint
ExtendedSequenceNumber sequenceNumber2 = new ExtendedSequenceNumber("5019");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber2);
preparedCheckpoint = processingCheckpointer.prepareCheckpoint();
Assert.assertEquals(sequenceNumber2, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sequenceNumber2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
preparedCheckpoint.checkpoint();
Assert.assertEquals(sequenceNumber2, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(sequenceNumber2, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
/**
* Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint(Record record)}.
*/
@Test
public final void testPrepareCheckpointRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025");
Record record = new Record().withSequenceNumber("5025");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(record);
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
preparedCheckpoint.checkpoint();
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
/**
* Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint(Record record)}.
*/
@Test
public final void testPrepareCheckpointSubRecord() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030");
Record record = new Record().withSequenceNumber("5030");
UserRecord subRecord = new UserRecord(record);
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(subRecord);
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
preparedCheckpoint.checkpoint();
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
/**
* Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber)}.
*/
@Test
public final void testPrepareCheckpointSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint("5035");
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
preparedCheckpoint.checkpoint();
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
/**
* Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#checkpoint(String sequenceNumber, long subSequenceNumber)}.
*/
@Test
public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040");
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint("5040", 0);
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
preparedCheckpoint.checkpoint();
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
/**
* Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order.
*/
@Test
public final void testMultipleOutstandingCheckpointersHappyCase() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("6040"));
ExtendedSequenceNumber sn1 = new ExtendedSequenceNumber("6010");
IPreparedCheckpointer firstPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("6010", 0);
Assert.assertEquals(sn1, firstPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
ExtendedSequenceNumber sn2 = new ExtendedSequenceNumber("6020");
IPreparedCheckpointer secondPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("6020", 0);
Assert.assertEquals(sn2, secondPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// checkpoint in order
firstPreparedCheckpoint.checkpoint();
Assert.assertEquals(sn1, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
secondPreparedCheckpoint.checkpoint();
Assert.assertEquals(sn2, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
/**
* Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order.
*/
@Test
public final void testMultipleOutstandingCheckpointersOutOfOrder() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("7040"));
ExtendedSequenceNumber sn1 = new ExtendedSequenceNumber("7010");
IPreparedCheckpointer firstPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("7010", 0);
Assert.assertEquals(sn1, firstPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
ExtendedSequenceNumber sn2 = new ExtendedSequenceNumber("7020");
IPreparedCheckpointer secondPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("7020", 0);
Assert.assertEquals(sn2, secondPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// checkpoint out of order
secondPreparedCheckpoint.checkpoint();
Assert.assertEquals(sn2, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
try {
firstPreparedCheckpoint.checkpoint();
Assert.fail("checkpoint() should have failed because the sequence number was too low");
} catch (IllegalArgumentException e) {
} catch (Exception e) {
Assert.fail("checkpoint() should have thrown an IllegalArgumentException but instead threw " + e);
}
}
/** /**
* Test method for update() * Test method for update()
* *
*/ */
@Test @Test
public final void testUpdate() throws Exception { public final void testUpdate() throws Exception {
ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null); RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null);
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10"); ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10");
@ -193,8 +376,6 @@ public class RecordProcessorCheckpointerTest {
*/ */
@Test @Test
public final void testClientSpecifiedCheckpoint() throws Exception { public final void testClientSpecifiedCheckpoint() throws Exception {
ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
SequenceNumberValidator validator = mock(SequenceNumberValidator.class); SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
RecordProcessorCheckpointer processingCheckpointer = RecordProcessorCheckpointer processingCheckpointer =
@ -275,10 +456,129 @@ public class RecordProcessorCheckpointerTest {
processingCheckpointer.getLastCheckpointValue()); processingCheckpointer.getLastCheckpointValue());
} }
/*
* This test is a mixed test of checking some basic functionality of two phase checkpointing at a sequence number
* and making sure certain bounds checks and validations are being performed inside the checkpointer to prevent
* clients from checkpointing out of order/too big/non-numeric values that aren't valid strings for them to be
* checkpointing
*/
@Test
public final void testClientSpecifiedTwoPhaseCheckpoint() throws Exception {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
// Several checkpoints we're gonna hit
ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2");
ExtendedSequenceNumber firstSequenceNumber = checkpoint.getCheckpoint(shardId); // 13
ExtendedSequenceNumber secondSequenceNumber = new ExtendedSequenceNumber("127");
ExtendedSequenceNumber thirdSequenceNumber = new ExtendedSequenceNumber("5019");
ExtendedSequenceNumber lastSequenceNumberOfShard = new ExtendedSequenceNumber("6789");
ExtendedSequenceNumber tooBigSequenceNumber = new ExtendedSequenceNumber("9000");
processingCheckpointer.setInitialCheckpointValue(firstSequenceNumber);
processingCheckpointer.setLargestPermittedCheckpointValue(thirdSequenceNumber);
// confirm that we cannot move backward
try {
processingCheckpointer.prepareCheckpoint(tooSmall.getSequenceNumber(), tooSmall.getSubSequenceNumber());
Assert.fail("You shouldn't be able to prepare a checkpoint earlier than the initial checkpoint.");
} catch (IllegalArgumentException e) {
// yay!
}
try {
processingCheckpointer.checkpoint(tooSmall.getSequenceNumber(), tooSmall.getSubSequenceNumber());
Assert.fail("You shouldn't be able to checkpoint earlier than the initial checkpoint.");
} catch (IllegalArgumentException e) {
// yay!
}
// advance to first
processingCheckpointer.checkpoint(firstSequenceNumber.getSequenceNumber(), firstSequenceNumber.getSubSequenceNumber());
Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpoint(shardId));
// prepare checkpoint at initial checkpoint value
IPreparedCheckpointer doesNothingPreparedCheckpoint =
processingCheckpointer.prepareCheckpoint(firstSequenceNumber.getSequenceNumber(), firstSequenceNumber.getSubSequenceNumber());
Assert.assertTrue(doesNothingPreparedCheckpoint instanceof DoesNothingPreparedCheckpointer);
Assert.assertEquals(firstSequenceNumber, doesNothingPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// nothing happens after checkpointing a doesNothingPreparedCheckpoint
doesNothingPreparedCheckpoint.checkpoint();
Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// advance to second
processingCheckpointer.prepareCheckpoint(secondSequenceNumber.getSequenceNumber(), secondSequenceNumber.getSubSequenceNumber());
Assert.assertEquals(secondSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
processingCheckpointer.checkpoint(secondSequenceNumber.getSequenceNumber(), secondSequenceNumber.getSubSequenceNumber());
Assert.assertEquals(secondSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
ExtendedSequenceNumber[] valuesWeShouldNotBeAbleToCheckpointAt =
{ tooSmall, // Shouldn't be able to move before the first value we ever checkpointed
firstSequenceNumber, // Shouldn't even be able to move back to a once used sequence number
tooBigSequenceNumber, // Can't exceed the max sequence number in the checkpointer
lastSequenceNumberOfShard, // Just another big value that we will use later
null, // Not a valid sequence number
new ExtendedSequenceNumber("bogus-checkpoint-value"), // Can't checkpoint at non-numeric string
ExtendedSequenceNumber.SHARD_END, // Can't go to the end unless it is set as the max
ExtendedSequenceNumber.TRIM_HORIZON, // Can't go back to an initial sentinel value
ExtendedSequenceNumber.LATEST // Can't go back to an initial sentinel value
};
for (ExtendedSequenceNumber badCheckpointValue : valuesWeShouldNotBeAbleToCheckpointAt) {
try {
processingCheckpointer.prepareCheckpoint(badCheckpointValue.getSequenceNumber(), badCheckpointValue.getSubSequenceNumber());
fail("checkpointing at bad or out of order sequence didn't throw exception");
} catch (IllegalArgumentException e) {
} catch (NullPointerException e) {
}
Assert.assertEquals("Checkpoint value should not have changed",
secondSequenceNumber,
checkpoint.getCheckpoint(shardId));
Assert.assertEquals("Last checkpoint value should not have changed",
secondSequenceNumber,
processingCheckpointer.getLastCheckpointValue());
Assert.assertEquals("Largest sequence number should not have changed",
thirdSequenceNumber,
processingCheckpointer.getLargestPermittedCheckpointValue());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
// advance to third number
processingCheckpointer.prepareCheckpoint(thirdSequenceNumber.getSequenceNumber(), thirdSequenceNumber.getSubSequenceNumber());
Assert.assertEquals(thirdSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
processingCheckpointer.checkpoint(thirdSequenceNumber.getSequenceNumber(), thirdSequenceNumber.getSubSequenceNumber());
Assert.assertEquals(thirdSequenceNumber, checkpoint.getCheckpoint(shardId));
// Testing a feature that prevents checkpointing at SHARD_END twice
processingCheckpointer.setLargestPermittedCheckpointValue(lastSequenceNumberOfShard);
processingCheckpointer.setSequenceNumberAtShardEnd(processingCheckpointer.getLargestPermittedCheckpointValue());
processingCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
processingCheckpointer.prepareCheckpoint(lastSequenceNumberOfShard.getSequenceNumber(), lastSequenceNumberOfShard.getSubSequenceNumber());
Assert.assertEquals("Preparing a checkpoing at the sequence number at the end of a shard should be the same as "
+ "preparing a checkpoint at SHARD_END",
ExtendedSequenceNumber.SHARD_END,
checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
private enum CheckpointAction { private enum CheckpointAction {
NONE, NO_SEQUENCE_NUMBER, WITH_SEQUENCE_NUMBER; NONE, NO_SEQUENCE_NUMBER, WITH_SEQUENCE_NUMBER;
} }
private enum CheckpointerType {
CHECKPOINTER, PREPARED_CHECKPOINTER, PREPARE_THEN_CHECKPOINTER;
}
/** /**
* Tests a bunch of mixed calls between checkpoint() and checkpoint(sequenceNumber) using a helper function. * Tests a bunch of mixed calls between checkpoint() and checkpoint(sequenceNumber) using a helper function.
* *
@ -290,16 +590,59 @@ public class RecordProcessorCheckpointerTest {
@SuppressWarnings("serial") @SuppressWarnings("serial")
@Test @Test
public final void testMixedCheckpointCalls() throws Exception { public final void testMixedCheckpointCalls() throws Exception {
ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
SequenceNumberValidator validator = mock(SequenceNumberValidator.class); SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
RecordProcessorCheckpointer processingCheckpointer = for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator); RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.CHECKPOINTER);
}
}
List<LinkedHashMap<String, CheckpointAction>> testPlans = /**
new ArrayList<LinkedHashMap<String, CheckpointAction>>(); * similar to
* {@link RecordProcessorCheckpointerTest#testMixedCheckpointCalls()} ,
* but executes in two phase commit mode, where we prepare a checkpoint and then commit the prepared checkpoint
*
* @throws Exception
*/
@SuppressWarnings("serial")
@Test
public final void testMixedTwoPhaseCheckpointCalls() throws Exception {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARED_CHECKPOINTER);
}
}
/**
* similar to
* {@link RecordProcessorCheckpointerTest#testMixedCheckpointCalls()} ,
* but executes in two phase commit mode, where we prepare a checkpoint, but we checkpoint using the
* RecordProcessorCheckpointer instead of the returned IPreparedCheckpointer
*
* @throws Exception
*/
@SuppressWarnings("serial")
@Test
public final void testMixedTwoPhaseCheckpointCalls2() throws Exception {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARE_THEN_CHECKPOINTER);
}
}
private List<LinkedHashMap<String, CheckpointAction>> getMixedCallsTestPlan() {
List<LinkedHashMap<String, CheckpointAction>> testPlans = new ArrayList<LinkedHashMap<String, CheckpointAction>>();
/* /*
* Simulate a scenario where the checkpointer is created at "latest". * Simulate a scenario where the checkpointer is created at "latest".
@ -356,11 +699,7 @@ public class RecordProcessorCheckpointerTest {
} }
}); });
for (LinkedHashMap<String, CheckpointAction> testPlan : testPlans) { return testPlans;
processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
testMixedCheckpointCalls(processingCheckpointer, testPlan);
}
} }
/** /**
@ -376,9 +715,11 @@ public class RecordProcessorCheckpointerTest {
* @throws Exception * @throws Exception
*/ */
private void testMixedCheckpointCalls(RecordProcessorCheckpointer processingCheckpointer, private void testMixedCheckpointCalls(RecordProcessorCheckpointer processingCheckpointer,
LinkedHashMap<String, CheckpointAction> checkpointValueAndAction) throws Exception { LinkedHashMap<String, CheckpointAction> checkpointValueAndAction,
CheckpointerType checkpointerType) throws Exception {
for (Entry<String, CheckpointAction> entry : checkpointValueAndAction.entrySet()) { for (Entry<String, CheckpointAction> entry : checkpointValueAndAction.entrySet()) {
IPreparedCheckpointer preparedCheckpoint = null;
ExtendedSequenceNumber lastCheckpointValue = processingCheckpointer.getLastCheckpointValue(); ExtendedSequenceNumber lastCheckpointValue = processingCheckpointer.getLastCheckpointValue();
if (SentinelCheckpoint.SHARD_END.toString().equals(entry.getKey())) { if (SentinelCheckpoint.SHARD_END.toString().equals(entry.getKey())) {
@ -400,10 +741,34 @@ public class RecordProcessorCheckpointerTest {
processingCheckpointer.getLastCheckpointValue()); processingCheckpointer.getLastCheckpointValue());
continue; continue;
case NO_SEQUENCE_NUMBER: case NO_SEQUENCE_NUMBER:
processingCheckpointer.checkpoint(); switch (checkpointerType) {
case CHECKPOINTER:
processingCheckpointer.checkpoint();
break;
case PREPARED_CHECKPOINTER:
preparedCheckpoint = processingCheckpointer.prepareCheckpoint();
preparedCheckpoint.checkpoint();
case PREPARE_THEN_CHECKPOINTER:
preparedCheckpoint = processingCheckpointer.prepareCheckpoint();
processingCheckpointer.checkpoint(
preparedCheckpoint.getPendingCheckpoint().getSequenceNumber(),
preparedCheckpoint.getPendingCheckpoint().getSubSequenceNumber());
}
break; break;
case WITH_SEQUENCE_NUMBER: case WITH_SEQUENCE_NUMBER:
processingCheckpointer.checkpoint(entry.getKey()); switch (checkpointerType) {
case CHECKPOINTER:
processingCheckpointer.checkpoint(entry.getKey());
break;
case PREPARED_CHECKPOINTER:
preparedCheckpoint = processingCheckpointer.prepareCheckpoint(entry.getKey());
preparedCheckpoint.checkpoint();
case PREPARE_THEN_CHECKPOINTER:
preparedCheckpoint = processingCheckpointer.prepareCheckpoint(entry.getKey());
processingCheckpointer.checkpoint(
preparedCheckpoint.getPendingCheckpoint().getSequenceNumber(),
preparedCheckpoint.getPendingCheckpoint().getSubSequenceNumber());
}
break; break;
} }
// We must have checkpointed to get here, so let's make sure our last checkpoint value is up to date // We must have checkpointed to get here, so let's make sure our last checkpoint value is up to date
@ -413,6 +778,11 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals("Expected the largest checkpoint value to remain the same since the last set", Assert.assertEquals("Expected the largest checkpoint value to remain the same since the last set",
new ExtendedSequenceNumber(entry.getKey()), new ExtendedSequenceNumber(entry.getKey()),
processingCheckpointer.getLargestPermittedCheckpointValue()); processingCheckpointer.getLargestPermittedCheckpointValue());
Assert.assertEquals(new ExtendedSequenceNumber(entry.getKey()), checkpoint.getCheckpoint(shardId));
Assert.assertEquals(new ExtendedSequenceNumber(entry.getKey()),
checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
} }
} }
} }

View file

@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
@ -37,6 +38,7 @@ import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Objects;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -45,6 +47,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -52,6 +57,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
@ -108,6 +114,7 @@ public class ShardConsumerTest {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class);
when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
StreamConfig streamConfig = StreamConfig streamConfig =
@ -156,6 +163,7 @@ public class ShardConsumerTest {
ExecutorService spyExecutorService = spy(executorService); ExecutorService spyExecutorService = spy(executorService);
when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class);
when(checkpoint.getCheckpointObject(anyString())).thenThrow(NullPointerException.class);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
StreamConfig streamConfig = StreamConfig streamConfig =
new StreamConfig(streamProxy, new StreamConfig(streamProxy,
@ -218,8 +226,11 @@ public class ShardConsumerTest {
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // submit BlockOnParentShardTask consumer.consumeShard(); // submit BlockOnParentShardTask
@ -233,7 +244,8 @@ public class ShardConsumerTest {
consumer.consumeShard(); // submit InitializeTask consumer.consumeShard(); // submit InitializeTask
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(any(InitializationInput.class)); verify(processor, times(1)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
try { try {
// Checking the status of submitted InitializeTask from above should throw exception. // Checking the status of submitted InitializeTask from above should throw exception.
@ -244,14 +256,17 @@ public class ShardConsumerTest {
} }
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(any(InitializationInput.class)); verify(processor, times(1)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
doNothing().when(processor).initialize(any(InitializationInput.class)); doNothing().when(processor).initialize(any(InitializationInput.class));
consumer.consumeShard(); // submit InitializeTask again. consumer.consumeShard(); // submit InitializeTask again.
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(2)).initialize(any(InitializationInput.class)); verify(processor, times(2)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
verify(processor, times(2)).initialize(any(InitializationInput.class)); // no other calls with different args
// Checking the status of submitted InitializeTask from above should pass. // Checking the status of submitted InitializeTask from above should pass.
consumer.consumeShard(); consumer.consumeShard();
@ -447,6 +462,54 @@ public class ShardConsumerTest {
file.delete(); file.delete();
} }
@SuppressWarnings("unchecked")
@Test
public final void testConsumeShardInitializedWithPendingCheckpoint() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
when(leaseManager.getLease(anyString())).thenReturn(null);
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // submit BlockOnParentShardTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
verify(processor, times(0)).initialize(any(InitializationInput.class));
consumer.consumeShard(); // submit InitializeTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
verify(processor, times(1)).initialize(any(InitializationInput.class)); // no other calls with different args
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
}
//@formatter:off (gets the formatting wrong) //@formatter:off (gets the formatting wrong)
private void verifyConsumedRecords(List<Record> expectedRecords, private void verifyConsumedRecords(List<Record> expectedRecords,
List<Record> actualRecords) { List<Record> actualRecords) {
@ -469,4 +532,21 @@ public class ShardConsumerTest {
} }
return userRecords; return userRecords;
} }
Matcher<InitializationInput> initializationInputMatcher(final ExtendedSequenceNumber checkpoint,
final ExtendedSequenceNumber pendingCheckpoint) {
return new TypeSafeMatcher<InitializationInput>() {
@Override
protected boolean matchesSafely(InitializationInput item) {
return Objects.equals(checkpoint, item.getExtendedSequenceNumber())
&& Objects.equals(pendingCheckpoint, item.getPendingCheckpointSequenceNumber());
}
@Override
public void describeTo(Description description) {
description.appendText(String.format("Checkpoint should be %s and pending checkpoint should be %s",
checkpoint, pendingCheckpoint));
}
};
}
} }

View file

@ -27,6 +27,7 @@ public class KinesisClientLeaseBuilder {
private UUID concurrencyToken; private UUID concurrencyToken;
private Long lastCounterIncrementNanos; private Long lastCounterIncrementNanos;
private ExtendedSequenceNumber checkpoint; private ExtendedSequenceNumber checkpoint;
private ExtendedSequenceNumber pendingCheckpoint;
private Long ownerSwitchesSinceCheckpoint = 0L; private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>(); private Set<String> parentShardIds = new HashSet<>();
@ -60,6 +61,11 @@ public class KinesisClientLeaseBuilder {
return this; return this;
} }
public KinesisClientLeaseBuilder withPendingCheckpoint(ExtendedSequenceNumber pendingCheckpoint) {
this.pendingCheckpoint = pendingCheckpoint;
return this;
}
public KinesisClientLeaseBuilder withOwnerSwitchesSinceCheckpoint(Long ownerSwitchesSinceCheckpoint) { public KinesisClientLeaseBuilder withOwnerSwitchesSinceCheckpoint(Long ownerSwitchesSinceCheckpoint) {
this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint; this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint;
return this; return this;
@ -72,6 +78,6 @@ public class KinesisClientLeaseBuilder {
public KinesisClientLease build() { public KinesisClientLease build() {
return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
checkpoint, ownerSwitchesSinceCheckpoint, parentShardIds); checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds);
} }
} }

View file

@ -18,6 +18,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateExcep
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
@ -99,6 +100,34 @@ public class StreamingRecordProcessorTest {
IllegalArgumentException { IllegalArgumentException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public IPreparedCheckpointer prepareCheckpoint()
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException {
throw new UnsupportedOperationException();
}
@Override
public IPreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException {
throw new UnsupportedOperationException();
}
@Override
public IPreparedCheckpointer prepareCheckpoint(String sequenceNumber)
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
throw new UnsupportedOperationException();
}
@Override
public IPreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
throw new UnsupportedOperationException();
}
}; };
private MessageWriter messageWriter; private MessageWriter messageWriter;