Merging to ltr_1

This commit is contained in:
Chunxue Yang 2020-04-22 11:14:32 -07:00
commit 9b14b93941
44 changed files with 1001 additions and 249 deletions

View file

@ -112,6 +112,11 @@ public class StreamingShardRecordProcessorTest {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException,
@ -119,6 +124,11 @@ public class StreamingShardRecordProcessorTest {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber)
throws KinesisClientLibDependencyException,
@ -126,6 +136,11 @@ public class StreamingShardRecordProcessorTest {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
return null;
}
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException,
@ -133,6 +148,11 @@ public class StreamingShardRecordProcessorTest {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
throw new UnsupportedOperationException();
}
@Override
public Checkpointer checkpointer() {
throw new UnsupportedOperationException();

View file

@ -26,18 +26,26 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public class Checkpoint {
private final ExtendedSequenceNumber checkpoint;
private final ExtendedSequenceNumber pendingCheckpoint;
private final byte[] pendingCheckpointState;
@Deprecated
public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) {
this(checkpoint, pendingCheckpoint, null);
}
/**
* Constructor.
*
* @param checkpoint the checkpoint sequence number - cannot be null or empty.
* @param pendingCheckpoint the pending checkpoint sequence number - can be null.
* @param pendingCheckpointState the pending checkpoint state - can be null.
*/
public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) {
public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState) {
if (checkpoint == null || checkpoint.sequenceNumber().isEmpty()) {
throw new IllegalArgumentException("Checkpoint cannot be null or empty");
}
this.checkpoint = checkpoint;
this.pendingCheckpoint = pendingCheckpoint;
this.pendingCheckpointState = pendingCheckpointState;
}
}

View file

@ -144,8 +144,15 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* {@inheritDoc}
*/
@Override
public synchronized PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState);
}
/**
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
//
// TODO: UserRecord Deprecation
//
@ -154,10 +161,19 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
} /*else if (record instanceof UserRecord) {
return prepareCheckpoint(record.sequenceNumber(), ((UserRecord) record).subSequenceNumber());
} */ else {
return prepareCheckpoint(record.sequenceNumber(), 0);
return prepareCheckpoint(record.sequenceNumber(), 0, applicationState);
}
}
/**
* {@inheritDoc}
*/
@Override
public synchronized PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(record, null);
}
/**
* {@inheritDoc}
*/
@ -167,13 +183,30 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
return prepareCheckpoint(sequenceNumber, 0);
}
/**
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
return prepareCheckpoint(sequenceNumber, 0, applicationState);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(sequenceNumber, subSequenceNumber, null);
}
/**
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
if (subSequenceNumber < 0) {
throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number "
+ subSequenceNumber);
@ -191,7 +224,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}",
ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint);
}
return doPrepareCheckpoint(pendingCheckpoint);
return doPrepareCheckpoint(pendingCheckpoint, applicationState);
} else {
throw new IllegalArgumentException(String.format(
"Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable "
@ -290,7 +323,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* @throws ThrottlingException
* @throws ShutdownException
*/
private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber)
private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber;
@ -308,7 +341,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
}
try {
checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken());
checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken(), applicationState);
} catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) {
throw e;

View file

@ -88,7 +88,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
try {
Lease lease = leaseRefresher.getLease(leaseKey);
log.debug("[{}] Retrieved lease => {}", leaseKey, lease);
return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint());
return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint(), lease.pendingCheckpointState());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
String message = "Unable to fetch checkpoint for shardId " + leaseKey;
log.error(message, e);
@ -99,9 +99,14 @@ public class DynamoDBCheckpointer implements Checkpointer {
@Override
public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint,
final String concurrencyToken) throws KinesisClientLibException {
prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null);
}
@Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException {
try {
boolean wasSuccessful =
prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken));
prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState);
if (!wasSuccessful) {
throw new ShutdownException(
"Can't prepare checkpoint - instance doesn't hold the lease for this shard");
@ -129,12 +134,13 @@ public class DynamoDBCheckpointer implements Checkpointer {
lease.checkpoint(checkpoint);
lease.pendingCheckpoint(null);
lease.pendingCheckpointState(null);
lease.ownerSwitchesSinceCheckpoint(0L);
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
}
boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken)
boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken, byte[] pendingCheckpointState)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey);
if (lease == null) {
@ -144,6 +150,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
}
lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null"));
lease.pendingCheckpointState(pendingCheckpointState);
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
}

View file

@ -15,6 +15,10 @@
package software.amazon.kinesis.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -29,14 +33,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
@ -50,6 +52,7 @@ import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -59,9 +62,7 @@ import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
@ -77,7 +78,6 @@ import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
@ -89,9 +89,6 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
/**
*
*/
@ -326,6 +323,7 @@ public class Scheduler implements Runnable {
if (shouldInitiateLeaseSync()) {
log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier());
leaderElectedPeriodicShardSyncManager.syncShardsOnce();
}
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
@ -551,7 +549,6 @@ public class Scheduler implements Runnable {
* Requests a graceful shutdown of the worker, notifying record processors, that implement
* {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
* checkpoint.
*
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
* previous future.
*
@ -578,8 +575,8 @@ public class Scheduler implements Runnable {
* </ol>
*
* @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to
* terminate early.
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to
* terminate early.
*/
public Future<Boolean> startGracefulShutdown() {
synchronized (this) {
@ -596,9 +593,8 @@ public class Scheduler implements Runnable {
* shutdowns in your own executor, or execute the shutdown synchronously.
*
* @return a callable that run the graceful shutdown process. This may return a callable that return true if the
* graceful shutdown has already been completed.
* @throws IllegalStateException
* thrown by the callable if another callable has already started the shutdown process.
* graceful shutdown has already been completed.
* @throws IllegalStateException thrown by the callable if another callable has already started the shutdown process.
*/
public Callable<Boolean> createGracefulShutdownCallable() {
if (shutdownComplete()) {
@ -740,12 +736,11 @@ public class Scheduler implements Runnable {
/**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
*
* @param shardInfo
* Kinesis shard info
* @param shardInfo Kinesis shard info
* @return ShardConsumer for the shard
*/
ShardConsumer createOrGetShardConsumer(@NonNull final ShardInfo shardInfo,
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
// Instantiate a new consumer if we don't have one, or the one we
// had was from an earlier
@ -766,10 +761,10 @@ public class Scheduler implements Runnable {
}
protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory);
ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,
checkpoint);
checkpoint);
// The only case where streamName is not available will be when multistreamtracker not set. In this case,
// get the default stream name for the single stream application.
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
@ -806,7 +801,6 @@ public class Scheduler implements Runnable {
/**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* <p>
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
@ -851,7 +845,7 @@ public class Scheduler implements Runnable {
private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) {
final StreamIdentifier streamIdentifier;
if(streamIdentifierString.isPresent()) {
if (streamIdentifierString.isPresent()) {
streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get());
} else {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");

View file

@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.leases;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -36,6 +37,14 @@ public class DynamoUtils {
return AttributeValue.builder().ss(collectionValue).build();
}
public static AttributeValue createAttributeValue(byte[] byteBufferValue) {
if (byteBufferValue == null) {
throw new IllegalArgumentException("Byte buffer attributeValues cannot be null or empty.");
}
return AttributeValue.builder().b(SdkBytes.fromByteArray(byteBufferValue)).build();
}
public static AttributeValue createAttributeValue(String stringValue) {
if (stringValue == null || stringValue.isEmpty()) {
throw new IllegalArgumentException("String attributeValues cannot be null or empty.");
@ -52,6 +61,15 @@ public class DynamoUtils {
return AttributeValue.builder().n(longValue.toString()).build();
}
public static byte[] safeGetByteArray(Map<String, AttributeValue> dynamoRecord, String key) {
AttributeValue av = dynamoRecord.get(key);
if (av == null) {
return null;
} else {
return av.b().asByteArray();
}
}
public static Long safeGetLong(Map<String, AttributeValue> dynamoRecord, String key) {
AttributeValue av = dynamoRecord.get(key);
if (av == null) {

View file

@ -142,8 +142,7 @@ public class HierarchicalShardSyncer {
assertAllParentShardsAreClosed(inconsistentShardIds);
}
final List<Lease> currentLeases = isMultiStreamMode ?
getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) :
leaseRefresher.listLeases();
leaseRefresher.listLeasesForStream(shardDetector.streamIdentifier()) : leaseRefresher.listLeases();
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier());
final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty ? new EmptyLeaseTableSynchronizer() :
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
@ -171,29 +170,6 @@ public class HierarchicalShardSyncer {
}
}
// CHECKSTYLE:ON CyclomaticComplexity
/** Note: This method has package level access solely for testing purposes.
*
* @param streamIdentifier We'll use this stream identifier to filter leases
* @param leaseRefresher Used to fetch leases
* @return Return list of leases (corresponding to shards) of the specified stream.
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
*/
static List<Lease> getLeasesForStream(StreamIdentifier streamIdentifier,
LeaseRefresher leaseRefresher)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<Lease> streamLeases = new ArrayList<>();
for (Lease lease : leaseRefresher.listLeases()) {
if (streamIdentifier.serialize().equals(((MultiStreamLease)lease).streamIdentifier())) {
streamLeases.add(lease);
}
}
return streamLeases;
}
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
* and a reshard operation.
* @param inconsistentShardIds

View file

@ -26,15 +26,13 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.Synchronized;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;

View file

@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
@NoArgsConstructor
@Getter
@Accessors(fluent = true)
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds"})
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState"})
@ToString
public class Lease {
/*
@ -82,6 +82,14 @@ public class Lease {
* @return pending checkpoint, possibly null.
*/
private ExtendedSequenceNumber pendingCheckpoint;
/**
* Last pending application state. Deliberately excluded from hashCode and equals.
*
* @return pending checkpoint state, possibly null.
*/
private byte[] pendingCheckpointState;
/**
* @return count of distinct lease holders between checkpoints.
*/
@ -97,21 +105,23 @@ public class Lease {
protected Lease(Lease lease) {
this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(),
lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(),
lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds());
lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), lease.pendingCheckpointState());
}
@Deprecated
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) {
this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint,
ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null);
}
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) {
this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint,
ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>());
}
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final Set<String> childShardIds) {
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final Set<String> childShardIds,
final byte[] pendingCheckpointState) {
this.leaseKey = leaseKey;
this.leaseOwner = leaseOwner;
this.leaseCounter = leaseCounter;
@ -126,6 +136,7 @@ public class Lease {
if (childShardIds != null) {
this.childShardIds.addAll(childShardIds);
}
this.pendingCheckpointState = pendingCheckpointState;
}
/**
@ -145,6 +156,7 @@ public class Lease {
ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint());
checkpoint(lease.checkpoint);
pendingCheckpoint(lease.pendingCheckpoint);
pendingCheckpointState(lease.pendingCheckpointState);
parentShardIds(lease.parentShardIds);
childShardIds(lease.childShardIds());
}
@ -225,6 +237,15 @@ public class Lease {
this.pendingCheckpoint = pendingCheckpoint;
}
/**
* Sets pending checkpoint state.
*
* @param pendingCheckpointState can be null
*/
public void pendingCheckpointState(byte[] pendingCheckpointState) {
this.pendingCheckpointState = pendingCheckpointState;
}
/**
* Sets ownerSwitchesSinceCheckpoint.
*

View file

@ -15,17 +15,15 @@
package software.amazon.kinesis.leases;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
@ -35,11 +33,11 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.MultiStreamTracker;
/**
* Used by the KCL to configure lease management.
@ -145,6 +143,11 @@ public class LeaseManagementConfig {
*/
private int initialLeaseTableWriteCapacity = 10;
/**
* Configurable functional interface to override the existing shardDetector.
*/
private Function<StreamConfig, ShardDetector> customShardDetectorProvider;
/**
* The size of the thread pool to create for the lease renewer to use.
*
@ -291,30 +294,30 @@ public class LeaseManagementConfig {
if (leaseManagementFactory == null) {
Validate.notEmpty(streamName(), "Stream name is empty");
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
streamName(),
dynamoDBClient(),
tableName(),
workerIdentifier(),
executorService(),
initialPositionInStream(),
failoverTimeMillis(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
cleanupLeasesUponShardCompletion(),
ignoreUnexpectedChildShards(),
shardSyncIntervalMillis(),
consistentReads(),
listShardsBackoffTimeInMillis(),
maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
streamName(),
dynamoDBClient(),
tableName(),
workerIdentifier(),
executorService(),
initialPositionInStream(),
failoverTimeMillis(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
cleanupLeasesUponShardCompletion(),
ignoreUnexpectedChildShards(),
shardSyncIntervalMillis(),
consistentReads(),
listShardsBackoffTimeInMillis(),
maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
}
return leaseManagementFactory;
}
@ -328,31 +331,32 @@ public class LeaseManagementConfig {
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) {
if(leaseManagementFactory == null) {
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
dynamoDBClient(),
tableName(),
workerIdentifier(),
executorService(),
failoverTimeMillis(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
cleanupLeasesUponShardCompletion(),
ignoreUnexpectedChildShards(),
shardSyncIntervalMillis(),
consistentReads(),
listShardsBackoffTimeInMillis(),
maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(isMultiStreamingMode),
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
leaseSerializer);
dynamoDBClient(),
tableName(),
workerIdentifier(),
executorService(),
failoverTimeMillis(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
cleanupLeasesUponShardCompletion(),
ignoreUnexpectedChildShards(),
shardSyncIntervalMillis(),
consistentReads(),
listShardsBackoffTimeInMillis(),
maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(isMultiStreamingMode),
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
leaseSerializer,
customShardDetectorProvider());
}
return leaseManagementFactory;
}
@ -366,5 +370,4 @@ public class LeaseManagementConfig {
this.leaseManagementFactory = leaseManagementFactory;
return this;
}
}

View file

@ -16,6 +16,7 @@ package software.amazon.kinesis.leases;
import java.util.List;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -60,6 +61,18 @@ public interface LeaseRefresher {
*/
boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException;
/**
* List all leases for a given stream synchronously.
*
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
*
* @return list of leases
*/
List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
ProvisionedThroughputException;
/**
* List all objects in table synchronously.
*

View file

@ -15,12 +15,11 @@
package software.amazon.kinesis.leases;
import java.util.List;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.common.StreamIdentifier;
import java.util.List;
/**
*
*/
@ -34,5 +33,4 @@ public interface ShardDetector {
default StreamIdentifier streamIdentifier() {
throw new UnsupportedOperationException("StreamName not available");
}
}

View file

@ -17,7 +17,7 @@ package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
@ -61,6 +61,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private StreamConfig streamConfig;
private Function<StreamConfig, ShardDetector> customShardDetectorProvider;
private final long failoverTimeMillis;
private final long epsilonMillis;
private final int maxLeasesForWorker;
@ -231,7 +233,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
/**
* Constructor.
*
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
@ -365,7 +367,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param dynamoDbRequestTimeout
* @param billingMode
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
@ -382,7 +384,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer);
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
null);
this.streamConfig = streamConfig;
}
@ -425,7 +428,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
@ -452,6 +456,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
}
@Override
@ -522,8 +527,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
*/
@Override
public ShardDetector createShardDetector(StreamConfig streamConfig) {
return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
cacheMissWarningModulus, dynamoDbRequestTimeout);
return customShardDetectorProvider != null ? customShardDetectorProvider.apply(streamConfig) :
new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
cacheMissWarningModulus, dynamoDbRequestTimeout);
}
}

View file

@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.ImmutableMap;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
@ -29,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.*;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
@ -58,6 +60,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private boolean newTableCreated = false;
private static final String STREAM_NAME = "streamName";
private static final String DDB_STREAM_NAME = ":streamName";
/**
* Constructor.
*
@ -263,12 +268,21 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return System.currentTimeMillis() - startTime;
}
/**
* {@inheritDoc}
*/
@Override
public List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException,
InvalidStateException, ProvisionedThroughputException {
return list( null, streamIdentifier);
}
/**
* {@inheritDoc}
*/
@Override
public List<Lease> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(null);
return list(null, null);
}
/**
@ -277,22 +291,34 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
@Override
public boolean isLeaseTableEmpty()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(1).isEmpty();
return list(1, null).isEmpty();
}
/**
* List with the given page size. Package access for integration testing.
*
* @param limit number of items to consider at a time - used by integration tests to force paging.
* @param streamIdentifier streamIdentifier for multi-stream mode. Can be null.
* @return list of leases
* @throws InvalidStateException if table does not exist
* @throws DependencyException if DynamoDB scan fail in an unexpected way
* @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
*/
List<Lease> list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
List<Lease> list(Integer limit, StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
ProvisionedThroughputException {
log.debug("Listing leases from table {}", table);
ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(table);
if (streamIdentifier != null) {
final Map<String, AttributeValue> expressionAttributeValues = ImmutableMap.of(
DDB_STREAM_NAME, AttributeValue.builder().s(streamIdentifier.serialize()).build()
);
scanRequestBuilder = scanRequestBuilder.filterExpression(STREAM_NAME + " = " + DDB_STREAM_NAME)
.expressionAttributeValues(expressionAttributeValues);
}
if (limit != null) {
scanRequestBuilder = scanRequestBuilder.limit(limit);
}

View file

@ -51,6 +51,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint";
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState";
private static final String PARENT_SHARD_ID_KEY = "parentShardId";
private static final String CHILD_SHARD_ID_KEY = "childShardId";
@ -80,6 +81,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber()));
}
if (lease.pendingCheckpointState() != null) {
result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber()));
}
return result;
}
@ -111,6 +116,9 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY))
);
}
leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY));
return leaseToUpdate;
}
@ -227,6 +235,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
}
if (lease.pendingCheckpointState() != null) {
result.put(PENDING_CHECKPOINT_STATE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpointState())));
} else {
result.put(PENDING_CHECKPOINT_STATE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
}
if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
result.put(CHILD_SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
}

View file

@ -92,6 +92,7 @@ public class InitializeTask implements ConsumerTask {
.shardId(shardInfo.shardId())
.extendedSequenceNumber(initialCheckpoint)
.pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint())
.pendingCheckpointState(initialCheckpointObject.pendingCheckpointState())
.build();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory,

View file

@ -16,6 +16,8 @@ package software.amazon.kinesis.lifecycle;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.function.Function;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -26,11 +28,11 @@ import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@ -38,8 +40,8 @@ import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;

View file

@ -47,4 +47,12 @@ public class InitializationInput {
* completing the checkpoint.
*/
private final ExtendedSequenceNumber pendingCheckpointSequenceNumber;
/**
* The last pending application state of the previous record processor. May be null.
*
* This will only be set if the previous record processor had prepared a checkpoint, but lost its lease before
* completing the checkpoint.
*/
private final byte[] pendingCheckpointState;
}

View file

@ -57,6 +57,7 @@ public class ProcessRecordsInput {
* The records received from Kinesis. These records may have been de-aggregated if they were published by the KPL.
*/
private List<KinesisClientRecord> records;
/**
* A checkpointer that the {@link ShardRecordProcessor} can use to checkpoint its progress.
*/

View file

@ -40,7 +40,7 @@ public interface Checkpointer {
/**
* Get the current checkpoint stored for the specified shard. Useful for checking that the parent shard
* has been completely processed before we start processing the child shard.
*
*
* @param leaseKey Current checkpoint for this shard is fetched
* @return Current checkpoint for this shard, null if there is no record for this shard.
* @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint
@ -73,6 +73,22 @@ public interface Checkpointer {
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException;
/**
* Record intent to checkpoint for a shard. Upon failover, the pendingCheckpoint and pendingCheckpointState will be
* passed to the new ShardRecordProcessor's initialize() method.
*
* @param leaseKey Checkpoint is specified for this shard.
* @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number)
* @param concurrencyToken Used with conditional writes to prevent stale updates
* (e.g. if there was a fail over to a different record processor, we don't want to
* overwrite it's checkpoint)
* @param pendingCheckpointState Serialized application state at the pending checkpoint.
*
* @throws KinesisClientLibException Thrown if we were unable to save the checkpoint
*/
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState)
throws KinesisClientLibException;
void operation(String operation);
String operation();

View file

@ -93,7 +93,6 @@ public interface RecordProcessorCheckpointer {
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
/**
* This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()}
@ -145,6 +144,32 @@ public interface RecordProcessorCheckpointer {
PreparedCheckpointer prepareCheckpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the last data record that was delivered to the record processor.
* If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next
* IRecordProcessor for this shard will be informed of the prepared sequence number and application state.
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number and application state passed in to init() to behave idempotently.
*
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
*/
PreparedCheckpointer prepareCheckpoint(byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the at the provided record. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint.
@ -174,6 +199,38 @@ public interface RecordProcessorCheckpointer {
PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the at the provided record. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the record and application state at which to
* prepare the checkpoint.
*
* @param record A record at which to prepare checkpoint in this shard.
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number and application state passed in to init() to behave idempotently.
*
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint.
@ -200,6 +257,35 @@ public interface RecordProcessorCheckpointer {
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number and application state
* at which to checkpoint.
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
*
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
@ -228,5 +314,36 @@ public interface RecordProcessorCheckpointer {
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
/**
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
* but provides the ability to specify the sequence number, subsequence number, and application state at which to
* checkpoint.
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard.
* @param applicationState arbitrary application state that will be passed to the next record processor that
* processes the shard.
*
* @return an PreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this ShardRecordProcessor instance.
* @throws InvalidStateException Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
Checkpointer checkpointer();
}

View file

@ -0,0 +1,48 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.time.Duration;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.metrics.MetricsFactory;
public interface DataFetcherProviderConfig {
/**
* Gets stream identifier for dataFetcher.
*/
StreamIdentifier getStreamIdentifier();
/**
* Gets shard id.
*/
String getShardId();
/**
* Gets current instance of metrics factory.
*/
MetricsFactory getMetricsFactory();
/**
* Gets current max records allowed to process at a given time.
*/
Integer getMaxRecords();
/**
* Gets timeout for kinesis request.
*/
Duration getKinesisRequestTimeout();
}

View file

@ -14,7 +14,9 @@
*/
package software.amazon.kinesis.retrieval;
import java.util.Optional;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.retrieval.polling.DataFetcher;
import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher;
/**
@ -41,15 +43,33 @@ public interface GetRecordsRetrievalStrategy {
/**
* Returns whether this strategy has been shutdown.
*
*
* @return true if the strategy has been shutdown, false otherwise.
*/
boolean isShutdown();
/**
* Returns the KinesisDataFetcher used to records from Kinesis.
*
* @return KinesisDataFetcher
* Returns a DataFetcher used to records from Kinesis.
*
* @return DataFetcher
*/
KinesisDataFetcher getDataFetcher();
/**
* Returns a DataFetcher override if applicable, else empty for retrieving records from Kinesis.
*
* @return Optional<DataFetcher>
*/
default Optional<DataFetcher> getDataFetcherOverride() {
return Optional.empty();
}
/**
* Returns a dataFetcher by first checking for an override if it exists, else using the default data fetcher.
*
* @return DataFetcher
*/
default DataFetcher dataFetcher() {
return getDataFetcherOverride().orElse(getDataFetcher());
}
}

View file

@ -0,0 +1,45 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.time.Duration;
import lombok.Data;
import lombok.NonNull;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.metrics.MetricsFactory;
/**
* Configuration needed for custom data fetchers
*/
@Data
public class KinesisDataFetcherProviderConfig implements DataFetcherProviderConfig {
@NonNull
private StreamIdentifier streamIdentifier;
@NonNull
private String shardId;
@NonNull
private MetricsFactory metricsFactory;
@NonNull
private Integer maxRecords;
@NonNull
private Duration kinesisRequestTimeout;
}

View file

@ -29,11 +29,15 @@ import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
/**
* Used by the KCL to configure the retrieval of records from Kinesis.
*/
@Getter @Setter @ToString @EqualsAndHashCode
@Getter
@Setter
@ToString
@EqualsAndHashCode
@Accessors(fluent = true)
public class RetrievalConfig {
/**
@ -52,6 +56,7 @@ public class RetrievalConfig {
@NonNull
private final String applicationName;
/**
* AppStreamTracker either for multi stream tracking or single stream
*/
@ -91,7 +96,7 @@ public class RetrievalConfig {
private RetrievalFactory retrievalFactory;
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName,
@NonNull String applicationName) {
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either
.right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended));
@ -99,7 +104,7 @@ public class RetrievalConfig {
}
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker,
@NonNull String applicationName) {
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either.left(multiStreamTracker);
this.applicationName = applicationName;
@ -117,17 +122,29 @@ public class RetrievalConfig {
}
public RetrievalFactory retrievalFactory() {
if (retrievalFactory == null) {
if (retrievalSpecificConfig == null) {
retrievalSpecificConfig = new FanOutConfig(kinesisClient())
.applicationName(applicationName());
retrievalSpecificConfig = appStreamTracker.map(multiStreamTracker -> retrievalSpecificConfig,
streamConfig -> ((FanOutConfig)retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName()));
streamConfig -> ((FanOutConfig) retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName()));
}
retrievalFactory = retrievalSpecificConfig.retrievalFactory();
}
validateConfig();
return retrievalFactory;
}
private void validateConfig() {
boolean isPollingConfig = retrievalSpecificConfig instanceof PollingConfig;
boolean isInvalidPollingConfig = isPollingConfig && appStreamTracker.map(multiStreamTracker ->
((PollingConfig) retrievalSpecificConfig).streamName() != null,
streamConfig ->
streamConfig.streamIdentifier() == null || streamConfig.streamIdentifier().streamName() == null);
if(isInvalidPollingConfig) {
throw new IllegalArgumentException("Invalid config: multistream enabled with streamName or single stream with no streamName");
}
}
}

View file

@ -15,10 +15,13 @@
package software.amazon.kinesis.retrieval;
import java.util.function.Function;
import software.amazon.kinesis.retrieval.polling.DataFetcher;
public interface RetrievalSpecificConfig {
/**
* Creates and returns a retrieval factory for the specific configuration
*
*
* @return a retrieval factory that can create an appropriate retriever
*/
RetrievalFactory retrievalFactory();

View file

@ -0,0 +1,124 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval.polling;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public interface DataFetcher {
/**
* Get records from the current position in the stream (up to maxRecords).
*
* @return list of records of up to maxRecords size
*/
DataFetcherResult getRecords();
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
*
* @param initialCheckpoint Current checkpoint sequence number for this shard.
* @param initialPositionInStream The initialPositionInStream.
*/
void initialize(String initialCheckpoint,
InitialPositionInStreamExtended initialPositionInStream);
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number as an
* ExtendedSequenceNumber.
*
* @param initialCheckpoint Current checkpoint sequence number for this shard.
* @param initialPositionInStream The initialPositionInStream.
*/
void initialize(ExtendedSequenceNumber initialCheckpoint,
InitialPositionInStreamExtended initialPositionInStream);
/**
* Advances this KinesisDataFetcher's internal iterator to be at the passed-in sequence number.
*
* @param sequenceNumber advance the iterator to the record at this sequence number.
* @param initialPositionInStream The initialPositionInStream.
*/
void advanceIteratorTo(String sequenceNumber,
InitialPositionInStreamExtended initialPositionInStream);
/**
* Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last
* records call.
*/
void restartIterator();
/**
* Resets the iterator by setting shardIterator, sequenceNumber and the position in the stream.
*
* @param shardIterator set the current shard iterator.
* @param sequenceNumber reset the iterator to the record at this sequence number.
* @param initialPositionInStream the current position in the stream to reset the iterator to.
*/
void resetIterator(String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream);
/**
* Retrieves the response based on the request.
*
* @param request the current get records request used to receive a response.
* @return GetRecordsResponse response for getRecords
*/
GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws Exception;
/**
* Retrieves the next get records request based on the current iterator.
*
* @param nextIterator specify the iterator to get the next record request
* @return {@link GetRecordsRequest}
*/
GetRecordsRequest getGetRecordsRequest(String nextIterator);
/**
* Gets the next iterator based on the request.
*
* @param request used to obtain the next shard iterator
* @return next iterator string
*/
String getNextIterator(GetShardIteratorRequest request) throws ExecutionException, InterruptedException, TimeoutException;
/**
* Gets the next set of records based on the iterator.
*
* @param nextIterator specified shard iterator for getting the next set of records
* @return {@link GetRecordsResponse}
*/
GetRecordsResponse getRecords(@NonNull String nextIterator);
/**
* Get the current account and stream information.
*
* @return {@link StreamIdentifier}
*/
StreamIdentifier getStreamIdentifier();
/**
* Checks if shardEnd is reached.
* @return boolean to determine whether shard end is reached
*/
boolean isShardEndReached();
}

View file

@ -14,20 +14,18 @@
*/
package software.amazon.kinesis.retrieval.polling;
import com.google.common.collect.Iterables;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Iterables;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
@ -47,8 +45,10 @@ import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -57,7 +57,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
*/
@Slf4j
@KinesisClientInternalApi
public class KinesisDataFetcher {
public class KinesisDataFetcher implements DataFetcher {
private static final String METRICS_PREFIX = "KinesisDataFetcher";
private static final String OPERATION = "ProcessTask";
@ -76,33 +76,39 @@ public class KinesisDataFetcher {
@Deprecated
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) {
this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
this(kinesisClient, new KinesisDataFetcherProviderConfig(
StreamIdentifier.singleStreamInstance(streamName),
shardId,
metricsFactory,
maxRecords,
PollingConfig.DEFAULT_REQUEST_TIMEOUT
));
}
/**
* Constructs KinesisDataFetcher.
* @param kinesisClient
* @param streamIdentifier
* @param shardId
* @param maxRecords
* @param metricsFactory
* @param maxFutureWait
*/
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) {
this.kinesisClient = kinesisClient;
this.streamIdentifier = streamIdentifier;
this.shardId = shardId;
this.maxRecords = maxRecords;
this.metricsFactory = metricsFactory;
this.maxFutureWait = maxFutureWait;
this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId;
}
/** Note: This method has package level access for testing purposes.
* Note: This method has package level access for testing purposes.
*
* @return nextIterator
*/
@Getter(AccessLevel.PACKAGE)
private String nextIterator;
/**
* Constructs KinesisDataFetcher.
*
* @param kinesisClient
* @param kinesisDataFetcherProviderConfig
*/
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, DataFetcherProviderConfig kinesisDataFetcherProviderConfig) {
this.kinesisClient = kinesisClient;
this.maxFutureWait = kinesisDataFetcherProviderConfig.getKinesisRequestTimeout();
this.maxRecords = kinesisDataFetcherProviderConfig.getMaxRecords();
this.metricsFactory = kinesisDataFetcherProviderConfig.getMetricsFactory();
this.shardId = kinesisDataFetcherProviderConfig.getShardId();
this.streamIdentifier = kinesisDataFetcherProviderConfig.getStreamIdentifier();
this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId;
}
@Getter
private boolean isShardEndReached;
private boolean isInitialized;
@ -114,6 +120,7 @@ public class KinesisDataFetcher {
*
* @return list of records of up to maxRecords size
*/
@Override
public DataFetcherResult getRecords() {
if (!isInitialized) {
throw new IllegalArgumentException("KinesisDataFetcher.records called before initialization.");
@ -187,6 +194,7 @@ public class KinesisDataFetcher {
* @param initialCheckpoint Current checkpoint sequence number for this shard.
* @param initialPositionInStream The initialPositionInStream.
*/
@Override
public void initialize(final String initialCheckpoint,
final InitialPositionInStreamExtended initialPositionInStream) {
log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint);
@ -194,6 +202,7 @@ public class KinesisDataFetcher {
isInitialized = true;
}
@Override
public void initialize(final ExtendedSequenceNumber initialCheckpoint,
final InitialPositionInStreamExtended initialPositionInStream) {
log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint.sequenceNumber());
@ -207,6 +216,7 @@ public class KinesisDataFetcher {
* @param sequenceNumber advance the iterator to the record at this sequence number.
* @param initialPositionInStream The initialPositionInStream.
*/
@Override
public void advanceIteratorTo(final String sequenceNumber,
final InitialPositionInStreamExtended initialPositionInStream) {
if (sequenceNumber == null) {
@ -228,9 +238,7 @@ public class KinesisDataFetcher {
try {
try {
final GetShardIteratorResponse result = FutureUtils
.resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait);
nextIterator = result.shardIterator();
nextIterator = getNextIterator(request);
success = true;
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
@ -260,6 +268,7 @@ public class KinesisDataFetcher {
* Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last
* records call.
*/
@Override
public void restartIterator() {
if (StringUtils.isEmpty(lastKnownSequenceNumber) || initialPositionInStream == null) {
throw new IllegalStateException(
@ -268,29 +277,49 @@ public class KinesisDataFetcher {
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream);
}
@Override
public void resetIterator(String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) {
this.nextIterator = shardIterator;
this.lastKnownSequenceNumber = sequenceNumber;
this.initialPositionInStream = initialPositionInStream;
}
private GetRecordsResponse getRecords(@NonNull final String nextIterator) {
final AWSExceptionManager exceptionManager = createExceptionManager();
GetRecordsRequest request = KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator)
@Override
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException {
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
maxFutureWait);
if (!isValidResponse(response)) {
throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId
+ ". nextShardIterator: " + response.nextShardIterator()
+ ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");
}
return response;
}
@Override
public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator)
.limit(maxRecords).build();
}
@Override
public String getNextIterator(GetShardIteratorRequest request) throws ExecutionException, InterruptedException, TimeoutException {
final GetShardIteratorResponse result = FutureUtils
.resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait);
return result.shardIterator();
}
@Override
public GetRecordsResponse getRecords(@NonNull final String nextIterator) {
final AWSExceptionManager exceptionManager = createExceptionManager();
GetRecordsRequest request = getGetRecordsRequest(nextIterator);
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
MetricsUtil.addShardId(metricsScope, shardId);
boolean success = false;
boolean success = false ;
long startTime = System.currentTimeMillis();
try {
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
maxFutureWait);
if (!isValidResponse(response)) {
throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId
+ ". nextShardIterator: " + response.nextShardIterator()
+ ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");
}
final GetRecordsResponse response = getGetRecordsResponse(request);
success = true;
return response;
} catch (ExecutionException e) {

View file

@ -17,31 +17,47 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Function;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
@Accessors(fluent = true)
@Data
@Getter
@Setter
@ToString
@EqualsAndHashCode
public class PollingConfig implements RetrievalSpecificConfig {
public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30);
/**
* Configurable functional interface to override the existing DataFetcher.
*/
Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
/**
* Name of the Kinesis stream.
*
* @return String
*/
@NonNull
private final String streamName;
private String streamName;
/**
* @param kinesisClient Client used to access Kinesis services.
*/
public PollingConfig(KinesisAsyncClient kinesisClient) {
this.kinesisClient = kinesisClient;
}
/**
* Client used to access to Kinesis service.
@ -60,6 +76,15 @@ public class PollingConfig implements RetrievalSpecificConfig {
*/
private int maxRecords = 10000;
/**
* @param streamName Name of Kinesis stream.
* @param kinesisClient Client used to access Kinesis serivces.
*/
public PollingConfig(String streamName, KinesisAsyncClient kinesisClient) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
}
/**
* The value for how long the ShardConsumer should sleep if no records are returned from the call to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}.
@ -105,6 +130,6 @@ public class PollingConfig implements RetrievalSpecificConfig {
@Override
public RetrievalFactory retrievalFactory() {
return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory,
maxRecords(), kinesisRequestTimeout);
maxRecords(), kinesisRequestTimeout, dataFetcherProvider);
}
}

View file

@ -15,6 +15,8 @@
package software.amazon.kinesis.retrieval.polling;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
@ -25,21 +27,17 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.google.common.annotations.VisibleForTesting;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
@ -61,7 +59,6 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
/**
@ -108,7 +105,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
@VisibleForTesting @Getter
private final LinkedBlockingQueue<PrefetchRecordsRetrieved> prefetchRecordsQueue;
private final PrefetchCounters prefetchCounters;
private final KinesisDataFetcher dataFetcher;
private final DataFetcher dataFetcher;
private InitialPositionInStreamExtended initialPositionInStreamExtended;
private String highestSequenceNumber;
@ -215,7 +212,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
this.publisherSession = new PublisherSession(new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput),
new PrefetchCounters(), this.getRecordsRetrievalStrategy.getDataFetcher());
new PrefetchCounters(), this.getRecordsRetrievalStrategy.dataFetcher());
this.executorService = executorService;
this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
@ -223,7 +220,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation;
this.streamAndShardId =
this.getRecordsRetrievalStrategy.getDataFetcher().getStreamIdentifier().serialize() + ":" + shardId;
this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier().serialize() + ":" + shardId;
}
@Override

View file

@ -15,6 +15,8 @@
package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@ -22,13 +24,13 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import java.time.Duration;
/**
*
*/
@ -42,32 +44,71 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient;
@NonNull
private final RecordsFetcherFactory recordsFetcherFactory;
// private final long listShardsBackoffTimeInMillis;
// private final int maxListShardsRetryAttempts;
private final int maxRecords;
private final Duration kinesisRequestTimeout;
public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords, Duration kinesisRequestTimeout) {
private final Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
@Deprecated
public SynchronousBlockingRetrievalFactory(String streamName,
KinesisAsyncClient kinesisClient,
RecordsFetcherFactory recordsFetcherFactory,
int maxRecords,
Duration kinesisRequestTimeout) {
this(streamName,
kinesisClient,
recordsFetcherFactory,
maxRecords,
kinesisRequestTimeout,
defaultDataFetcherProvider(kinesisClient));
}
public SynchronousBlockingRetrievalFactory(String streamName,
KinesisAsyncClient kinesisClient,
RecordsFetcherFactory recordsFetcherFactory,
int maxRecords,
Duration kinesisRequestTimeout,
Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider) {
this.streamName = streamName;
this.kinesisClient = kinesisClient;
this.recordsFetcherFactory = recordsFetcherFactory;
this.maxRecords = maxRecords;
this.kinesisRequestTimeout = kinesisRequestTimeout;
this.dataFetcherProvider = dataFetcherProvider == null ?
defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider;
}
@Deprecated
public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords) {
public SynchronousBlockingRetrievalFactory(String streamName,
KinesisAsyncClient kinesisClient,
RecordsFetcherFactory recordsFetcherFactory,
int maxRecords) {
this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
}
private static Function<DataFetcherProviderConfig, DataFetcher> defaultDataFetcherProvider(
KinesisAsyncClient kinesisClient) {
return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig);
}
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
@NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.singleStreamInstance(streamName);
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig(
streamIdentifier,
shardInfo.shardId(),
metricsFactory,
maxRecords,
kinesisRequestTimeout);
final DataFetcher dataFetcher = this.dataFetcherProvider.apply(kinesisDataFetcherProviderConfig);
return new SynchronousGetRecordsRetrievalStrategy(dataFetcher);
}
@Override

View file

@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.retrieval.polling;
import java.util.Optional;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@ -26,8 +27,9 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@Data
@KinesisClientInternalApi
public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy {
@NonNull
private final KinesisDataFetcher dataFetcher;
private final DataFetcher dataFetcher;
@Override
public GetRecordsResponse getRecords(final int maxRecords) {
@ -45,9 +47,14 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev
public boolean isShutdown() {
return false;
}
@Override
public KinesisDataFetcher getDataFetcher() {
throw new UnsupportedOperationException("Deprecated. Use dataFetcher() to retrieve a dataFetcher");
}
@Override
public DataFetcher dataFetcher() {
return dataFetcher;
}
}

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -25,6 +24,7 @@ import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;
@ -71,9 +71,15 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.singleStreamInstance(streamName);
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(),
maxRecords, metricsFactory, maxFutureWait));
new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig(
streamIdentifier,
shardInfo.shardId(),
metricsFactory,
maxRecords,
maxFutureWait
)));
}
@Override

View file

@ -90,6 +90,26 @@ public class CheckpointerTest {
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
}
@Test
public final void testInitialPrepareCheckpointWithApplicationState() throws Exception {
String sequenceNumber = "1";
String pendingCheckpointValue = "99999";
String shardId = "myShardId";
byte[] applicationState = "applicationState".getBytes();
ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(sequenceNumber);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken);
ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue);
checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), testConcurrencyToken,
applicationState);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState());
}
@Test
public final void testAdvancingPrepareCheckpoint() throws Exception {
String shardId = "myShardId";
@ -107,6 +127,26 @@ public class CheckpointerTest {
}
}
@Test
public final void testAdvancingPrepareCheckpointWithApplicationState() throws Exception {
String shardId = "myShardId";
String checkpointValue = "12345";
byte[] applicationState = "applicationState".getBytes();
ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), testConcurrencyToken);
for (Integer i = 0; i < 10; i++) {
String sequenceNumber = i.toString();
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber);
checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken,
applicationState);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState());
}
}
@Test
public final void testPrepareAndSetCheckpoint() throws Exception {
String checkpointValue = "12345";
@ -134,4 +174,35 @@ public class CheckpointerTest {
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
}
@Test
public final void testPrepareAndSetCheckpointWithApplicationState() throws Exception {
String checkpointValue = "12345";
String shardId = "testShardId-1";
String concurrencyToken = "token-1";
String pendingCheckpointValue = "99999";
byte[] applicationState = "applicationState".getBytes();
// set initial checkpoint
ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue);
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), concurrencyToken);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
// prepare checkpoint
ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue);
checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken, applicationState);
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState());
// do checkpoint
checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken);
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpointState());
}
}

View file

@ -18,7 +18,6 @@ import java.util.HashMap;
import java.util.Map;
import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -32,6 +31,7 @@ public class InMemoryCheckpointer implements Checkpointer {
private Map<String, ExtendedSequenceNumber> checkpoints = new HashMap<>();
private Map<String, ExtendedSequenceNumber> flushpoints = new HashMap<>();
private Map<String, ExtendedSequenceNumber> pendingCheckpoints = new HashMap<>();
private Map<String, byte[]> pendingCheckpointStates = new HashMap<>();
private String operation;
@ -44,6 +44,7 @@ public class InMemoryCheckpointer implements Checkpointer {
checkpoints.put(leaseKey, checkpointValue);
flushpoints.put(leaseKey, checkpointValue);
pendingCheckpoints.remove(leaseKey);
pendingCheckpointStates.remove(leaseKey);
if (log.isDebugEnabled()) {
log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue);
@ -64,15 +65,22 @@ public class InMemoryCheckpointer implements Checkpointer {
@Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException {
prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null);
}
@Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException {
pendingCheckpoints.put(leaseKey, pendingCheckpoint);
pendingCheckpointStates.put(leaseKey, pendingCheckpointState);
}
@Override
public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException {
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey);
byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey);
Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint);
Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint, pendingCheckpointState);
log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj);
return checkpointObj;
}

View file

@ -242,7 +242,7 @@ public class SchedulerTest {
final List<ShardInfo> secondShardInfo = Collections.singletonList(
new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber));
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null);
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null);
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint);
@ -368,7 +368,7 @@ public class SchedulerTest {
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber,
sc.streamIdentifier().serialize())).collect(Collectors.toList());
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null);
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null);
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint);

View file

@ -19,6 +19,7 @@ import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -54,6 +55,7 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
DELETELEASE(9),
DELETEALL(10),
UPDATELEASE(11),
LISTLEASESFORSTREAM(12),
NONE(Integer.MIN_VALUE);
private Integer index;
@ -129,6 +131,13 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
return leaseRefresher.waitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds);
}
@Override
public List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throwExceptions("listLeasesForStream", ExceptionThrowingLeaseRefresherMethods.LISTLEASESFORSTREAM);
return leaseRefresher.listLeasesForStream(streamIdentifier);
}
@Override
public List<Lease> listLeases()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {

View file

@ -969,7 +969,7 @@ public class HierarchicalShardSyncerTest {
parentShardIds.add(shard.adjacentParentShardId());
}
return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L,
parentShardIds);
parentShardIds, null);
}).collect(Collectors.toList());
}
@ -1039,7 +1039,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases);
when(dynamoDBLeaseRefresher.listLeasesForStream(any(StreamIdentifier.class))).thenReturn(leases);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
setupMultiStream();
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
@ -1049,7 +1049,7 @@ public class HierarchicalShardSyncerTest {
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
verify(shardDetector, times(2)).listShards();
verify(dynamoDBLeaseRefresher).listLeases();
verify(dynamoDBLeaseRefresher).listLeasesForStream(any(StreamIdentifier.class));
verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
}

View file

@ -35,9 +35,10 @@ public class LeaseBuilder {
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>();
private byte[] pendingCheckpointState;
public Lease build() {
return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds);
return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint,
pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, pendingCheckpointState);
}
}

View file

@ -127,16 +127,37 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
}
assertNotNull(lease);
ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber("newCheckpoint");
final ExtendedSequenceNumber initialCheckpoint = new ExtendedSequenceNumber("initialCheckpoint");
final ExtendedSequenceNumber pendingCheckpoint = new ExtendedSequenceNumber("pendingCheckpoint");
final ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber("newCheckpoint");
final byte[] checkpointState = "checkpointState".getBytes();
// lease's leaseCounter is wrong at this point, but it shouldn't matter.
assertTrue(dynamoDBCheckpointer.setCheckpoint(lease.leaseKey(), initialCheckpoint, lease.concurrencyToken()));
final Lease leaseFromDDBAtInitialCheckpoint = leaseRefresher.getLease(lease.leaseKey());
lease.leaseCounter(lease.leaseCounter() + 1);
lease.checkpoint(initialCheckpoint);
lease.leaseOwner(coordinator.workerIdentifier());
assertEquals(lease, leaseFromDDBAtInitialCheckpoint);
dynamoDBCheckpointer.prepareCheckpoint(lease.leaseKey(), pendingCheckpoint, lease.concurrencyToken().toString(), checkpointState);
final Lease leaseFromDDBAtPendingCheckpoint = leaseRefresher.getLease(lease.leaseKey());
lease.leaseCounter(lease.leaseCounter() + 1);
lease.checkpoint(initialCheckpoint);
lease.pendingCheckpoint(pendingCheckpoint);
lease.pendingCheckpointState(checkpointState);
assertEquals(lease, leaseFromDDBAtPendingCheckpoint);
assertTrue(dynamoDBCheckpointer.setCheckpoint(lease.leaseKey(), newCheckpoint, lease.concurrencyToken()));
Lease fromDynamo = leaseRefresher.getLease(lease.leaseKey());
final Lease leaseFromDDBAtNewCheckpoint = leaseRefresher.getLease(lease.leaseKey());
lease.leaseCounter(lease.leaseCounter() + 1);
lease.checkpoint(newCheckpoint);
lease.leaseOwner(coordinator.workerIdentifier());
assertEquals(lease, fromDynamo);
lease.pendingCheckpoint(null);
lease.pendingCheckpointState(null);
assertEquals(lease, leaseFromDDBAtNewCheckpoint);
}
/**

View file

@ -72,7 +72,7 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
Collection<Lease> expected = builder.build().values();
// The / 3 here ensures that we will test Dynamo's paging mechanics.
List<Lease> actual = leaseRefresher.list(numRecordsToPut / 3);
List<Lease> actual = leaseRefresher.list(numRecordsToPut / 3, null);
for (Lease lease : actual) {
assertNotNull(expected.remove(lease));

View file

@ -55,7 +55,7 @@ public class DynamoDBLeaseRenewerTest {
private LeaseRefresher leaseRefresher;
private static Lease newLease(String leaseKey) {
return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>());
return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), null);
}
@Before

View file

@ -107,7 +107,7 @@ public class PrefetchRecordsPublisherTest {
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Mock
private KinesisDataFetcher dataFetcher;
private DataFetcher dataFetcher;
@Mock
private InitialPositionInStreamExtended initialPosition;
@Mock
@ -124,7 +124,7 @@ public class PrefetchRecordsPublisherTest {
@Before
public void setup() {
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher);
when(getRecordsRetrievalStrategy.dataFetcher()).thenReturn(dataFetcher);
when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("testStream"));
executorService = spy(Executors.newFixedThreadPool(1));
getRecordsCache = new PrefetchRecordsPublisher(

View file

@ -40,14 +40,14 @@ public class RecordsFetcherFactoryTest {
@Mock
private MetricsFactory metricsFactory;
@Mock
private KinesisDataFetcher kinesisDataFetcher;
private DataFetcher dataFetcher;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
recordsFetcherFactory = new SimpleRecordsFetcherFactory();
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(kinesisDataFetcher);
when(kinesisDataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream"));
when(getRecordsRetrievalStrategy.dataFetcher()).thenReturn(dataFetcher);
when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream"));
}
@Test
@ -66,5 +66,4 @@ public class RecordsFetcherFactoryTest {
metricsFactory, 1);
assertThat(recordsCache, instanceOf(PrefetchRecordsPublisher.class));
}
}