diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java index da7e9fb2..e3368e07 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java @@ -112,6 +112,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + throw new UnsupportedOperationException(); + } + @Override public PreparedCheckpointer prepareCheckpoint(Record record) throws KinesisClientLibDependencyException, @@ -119,6 +124,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + throw new UnsupportedOperationException(); + } + @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber) throws KinesisClientLibDependencyException, @@ -126,6 +136,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + return null; + } + @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) throws KinesisClientLibDependencyException, @@ -133,6 +148,11 @@ public class StreamingShardRecordProcessorTest { throw new UnsupportedOperationException(); } + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + throw new UnsupportedOperationException(); + } + @Override public Checkpointer checkpointer() { throw new UnsupportedOperationException(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java index 2bab0cd6..f5af81e3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java @@ -26,18 +26,26 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class Checkpoint { private final ExtendedSequenceNumber checkpoint; private final ExtendedSequenceNumber pendingCheckpoint; + private final byte[] pendingCheckpointState; + + @Deprecated + public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) { + this(checkpoint, pendingCheckpoint, null); + } /** * Constructor. * * @param checkpoint the checkpoint sequence number - cannot be null or empty. * @param pendingCheckpoint the pending checkpoint sequence number - can be null. + * @param pendingCheckpointState the pending checkpoint state - can be null. */ - public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint) { + public Checkpoint(final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, byte[] pendingCheckpointState) { if (checkpoint == null || checkpoint.sequenceNumber().isEmpty()) { throw new IllegalArgumentException("Checkpoint cannot be null or empty"); } this.checkpoint = checkpoint; this.pendingCheckpoint = pendingCheckpoint; + this.pendingCheckpointState = pendingCheckpointState; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index 7d504bbb..fd375264 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -144,8 +144,15 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi * {@inheritDoc} */ @Override - public synchronized PreparedCheckpointer prepareCheckpoint(Record record) - throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState); + } + + /** + * {@inheritDoc} + */ + @Override + public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { // // TODO: UserRecord Deprecation // @@ -154,10 +161,19 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } /*else if (record instanceof UserRecord) { return prepareCheckpoint(record.sequenceNumber(), ((UserRecord) record).subSequenceNumber()); } */ else { - return prepareCheckpoint(record.sequenceNumber(), 0); + return prepareCheckpoint(record.sequenceNumber(), 0, applicationState); } } + /** + * {@inheritDoc} + */ + @Override + public synchronized PreparedCheckpointer prepareCheckpoint(Record record) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + return prepareCheckpoint(record, null); + } + /** * {@inheritDoc} */ @@ -167,13 +183,30 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi return prepareCheckpoint(sequenceNumber, 0); } + /** + * {@inheritDoc} + */ + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + return prepareCheckpoint(sequenceNumber, 0, applicationState); + } + /** * {@inheritDoc} */ @Override public synchronized PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + return prepareCheckpoint(sequenceNumber, subSequenceNumber, null); + } + /** + * {@inheritDoc} + */ + @Override + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { if (subSequenceNumber < 0) { throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number " + subSequenceNumber); @@ -191,7 +224,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}", ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint); } - return doPrepareCheckpoint(pendingCheckpoint); + return doPrepareCheckpoint(pendingCheckpoint, applicationState); } else { throw new IllegalArgumentException(String.format( "Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable " @@ -290,7 +323,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi * @throws ThrottlingException * @throws ShutdownException */ - private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber) + private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { ExtendedSequenceNumber newPrepareCheckpoint = extendedSequenceNumber; @@ -308,7 +341,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } try { - checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken()); + checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken(), applicationState); } catch (ThrottlingException | ShutdownException | InvalidStateException | KinesisClientLibDependencyException e) { throw e; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index fb7a6fc7..d9646351 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -88,7 +88,7 @@ public class DynamoDBCheckpointer implements Checkpointer { try { Lease lease = leaseRefresher.getLease(leaseKey); log.debug("[{}] Retrieved lease => {}", leaseKey, lease); - return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint()); + return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint(), lease.pendingCheckpointState()); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { String message = "Unable to fetch checkpoint for shardId " + leaseKey; log.error(message, e); @@ -99,9 +99,14 @@ public class DynamoDBCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint, final String concurrencyToken) throws KinesisClientLibException { + prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null); + } + + @Override + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { try { boolean wasSuccessful = - prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken)); + prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState); if (!wasSuccessful) { throw new ShutdownException( "Can't prepare checkpoint - instance doesn't hold the lease for this shard"); @@ -129,12 +134,13 @@ public class DynamoDBCheckpointer implements Checkpointer { lease.checkpoint(checkpoint); lease.pendingCheckpoint(null); + lease.pendingCheckpointState(null); lease.ownerSwitchesSinceCheckpoint(0L); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } - boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken) + boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken, byte[] pendingCheckpointState) throws DependencyException, InvalidStateException, ProvisionedThroughputException { Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); if (lease == null) { @@ -144,6 +150,7 @@ public class DynamoDBCheckpointer implements Checkpointer { } lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null")); + lease.pendingCheckpointState(pendingCheckpointState); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index cb4b4579..4d6c1fd7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -15,6 +15,10 @@ package software.amazon.kinesis.coordinator; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; + +import io.reactivex.plugins.RxJavaPlugins; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -29,14 +33,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @@ -50,6 +52,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -59,9 +62,7 @@ import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; -import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; -import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; @@ -77,7 +78,6 @@ import software.amazon.kinesis.lifecycle.ShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; @@ -89,9 +89,6 @@ import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; - /** * */ @@ -326,6 +323,7 @@ public class Scheduler implements Runnable { if (shouldInitiateLeaseSync()) { log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier()); leaderElectedPeriodicShardSyncManager.syncShardsOnce(); + } } else { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); @@ -551,7 +549,6 @@ public class Scheduler implements Runnable { * Requests a graceful shutdown of the worker, notifying record processors, that implement * {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to * checkpoint. - * * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the * previous future. * @@ -578,8 +575,8 @@ public class Scheduler implements Runnable { * * * @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown - * completed successfully. A false value indicates that a non-exception case caused the shutdown process to - * terminate early. + * completed successfully. A false value indicates that a non-exception case caused the shutdown process to + * terminate early. */ public Future startGracefulShutdown() { synchronized (this) { @@ -596,9 +593,8 @@ public class Scheduler implements Runnable { * shutdowns in your own executor, or execute the shutdown synchronously. * * @return a callable that run the graceful shutdown process. This may return a callable that return true if the - * graceful shutdown has already been completed. - * @throws IllegalStateException - * thrown by the callable if another callable has already started the shutdown process. + * graceful shutdown has already been completed. + * @throws IllegalStateException thrown by the callable if another callable has already started the shutdown process. */ public Callable createGracefulShutdownCallable() { if (shutdownComplete()) { @@ -740,12 +736,11 @@ public class Scheduler implements Runnable { /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * - * @param shardInfo - * Kinesis shard info + * @param shardInfo Kinesis shard info * @return ShardConsumer for the shard */ ShardConsumer createOrGetShardConsumer(@NonNull final ShardInfo shardInfo, - @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { + @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier @@ -766,10 +761,10 @@ public class Scheduler implements Runnable { } protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, - @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { + @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory); ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, - checkpoint); + checkpoint); // The only case where streamName is not available will be when multistreamtracker not set. In this case, // get the default stream name for the single stream application. final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt()); @@ -806,7 +801,6 @@ public class Scheduler implements Runnable { /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. - *

* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo @@ -851,7 +845,7 @@ public class Scheduler implements Runnable { private StreamIdentifier getStreamIdentifier(Optional streamIdentifierString) { final StreamIdentifier streamIdentifier; - if(streamIdentifierString.isPresent()) { + if (streamIdentifierString.isPresent()) { streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get()); } else { Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java index 9d5f9ae2..29d6029b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.leases; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -36,6 +37,14 @@ public class DynamoUtils { return AttributeValue.builder().ss(collectionValue).build(); } + public static AttributeValue createAttributeValue(byte[] byteBufferValue) { + if (byteBufferValue == null) { + throw new IllegalArgumentException("Byte buffer attributeValues cannot be null or empty."); + } + + return AttributeValue.builder().b(SdkBytes.fromByteArray(byteBufferValue)).build(); + } + public static AttributeValue createAttributeValue(String stringValue) { if (stringValue == null || stringValue.isEmpty()) { throw new IllegalArgumentException("String attributeValues cannot be null or empty."); @@ -52,6 +61,15 @@ public class DynamoUtils { return AttributeValue.builder().n(longValue.toString()).build(); } + public static byte[] safeGetByteArray(Map dynamoRecord, String key) { + AttributeValue av = dynamoRecord.get(key); + if (av == null) { + return null; + } else { + return av.b().asByteArray(); + } + } + public static Long safeGetLong(Map dynamoRecord, String key) { AttributeValue av = dynamoRecord.get(key); if (av == null) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 5cd6f472..debb89bb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -142,8 +142,7 @@ public class HierarchicalShardSyncer { assertAllParentShardsAreClosed(inconsistentShardIds); } final List currentLeases = isMultiStreamMode ? - getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) : - leaseRefresher.listLeases(); + leaseRefresher.listLeasesForStream(shardDetector.streamIdentifier()) : leaseRefresher.listLeases(); final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier()); final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty ? new EmptyLeaseTableSynchronizer() : new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); @@ -171,29 +170,6 @@ public class HierarchicalShardSyncer { } } - // CHECKSTYLE:ON CyclomaticComplexity - - /** Note: This method has package level access solely for testing purposes. - * - * @param streamIdentifier We'll use this stream identifier to filter leases - * @param leaseRefresher Used to fetch leases - * @return Return list of leases (corresponding to shards) of the specified stream. - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - */ - static List getLeasesForStream(StreamIdentifier streamIdentifier, - LeaseRefresher leaseRefresher) - throws DependencyException, ProvisionedThroughputException, InvalidStateException { - List streamLeases = new ArrayList<>(); - for (Lease lease : leaseRefresher.listLeases()) { - if (streamIdentifier.serialize().equals(((MultiStreamLease)lease).streamIdentifier())) { - streamLeases.add(lease); - } - } - return streamLeases; - } - /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls * and a reshard operation. * @param inconsistentShardIds diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index c0c3bdee..a29c5ce4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -26,15 +26,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; - import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; import lombok.Synchronized; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.LimitExceededException; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index a04e2725..2d0ce8c2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; @NoArgsConstructor @Getter @Accessors(fluent = true) -@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds"}) +@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState"}) @ToString public class Lease { /* @@ -82,6 +82,14 @@ public class Lease { * @return pending checkpoint, possibly null. */ private ExtendedSequenceNumber pendingCheckpoint; + + /** + * Last pending application state. Deliberately excluded from hashCode and equals. + * + * @return pending checkpoint state, possibly null. + */ + private byte[] pendingCheckpointState; + /** * @return count of distinct lease holders between checkpoints. */ @@ -97,21 +105,23 @@ public class Lease { protected Lease(Lease lease) { this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), - lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds()); + lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), lease.pendingCheckpointState()); + } + + @Deprecated + public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, + final UUID concurrencyToken, final Long lastCounterIncrementNanos, + final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, + final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { + this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, + ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null); } public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, final UUID concurrencyToken, final Long lastCounterIncrementNanos, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, - final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { - this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, - ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>()); - } - - public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, - final UUID concurrencyToken, final Long lastCounterIncrementNanos, - final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, - final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final Set childShardIds) { + final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final Set childShardIds, + final byte[] pendingCheckpointState) { this.leaseKey = leaseKey; this.leaseOwner = leaseOwner; this.leaseCounter = leaseCounter; @@ -126,6 +136,7 @@ public class Lease { if (childShardIds != null) { this.childShardIds.addAll(childShardIds); } + this.pendingCheckpointState = pendingCheckpointState; } /** @@ -145,6 +156,7 @@ public class Lease { ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint()); checkpoint(lease.checkpoint); pendingCheckpoint(lease.pendingCheckpoint); + pendingCheckpointState(lease.pendingCheckpointState); parentShardIds(lease.parentShardIds); childShardIds(lease.childShardIds()); } @@ -225,6 +237,15 @@ public class Lease { this.pendingCheckpoint = pendingCheckpoint; } + /** + * Sets pending checkpoint state. + * + * @param pendingCheckpointState can be null + */ + public void pendingCheckpointState(byte[] pendingCheckpointState) { + this.pendingCheckpointState = pendingCheckpointState; + } + /** * Sets ownerSwitchesSinceCheckpoint. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 2a5a0b1e..acaa8de0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -15,17 +15,15 @@ package software.amazon.kinesis.leases; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.time.Duration; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - +import java.util.function.Function; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; @@ -35,11 +33,11 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; -import software.amazon.kinesis.processor.MultiStreamTracker; /** * Used by the KCL to configure lease management. @@ -145,6 +143,11 @@ public class LeaseManagementConfig { */ private int initialLeaseTableWriteCapacity = 10; + /** + * Configurable functional interface to override the existing shardDetector. + */ + private Function customShardDetectorProvider; + /** * The size of the thread pool to create for the lease renewer to use. * @@ -291,30 +294,30 @@ public class LeaseManagementConfig { if (leaseManagementFactory == null) { Validate.notEmpty(streamName(), "Stream name is empty"); leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), - streamName(), - dynamoDBClient(), - tableName(), - workerIdentifier(), - executorService(), - initialPositionInStream(), - failoverTimeMillis(), - epsilonMillis(), - maxLeasesForWorker(), - maxLeasesToStealAtOneTime(), - maxLeaseRenewalThreads(), - cleanupLeasesUponShardCompletion(), - ignoreUnexpectedChildShards(), - shardSyncIntervalMillis(), - consistentReads(), - listShardsBackoffTimeInMillis(), - maxListShardsRetryAttempts(), - maxCacheMissesBeforeReload(), - listShardsCacheAllowedAgeInSeconds(), - cacheMissWarningModulus(), - initialLeaseTableReadCapacity(), - initialLeaseTableWriteCapacity(), - hierarchicalShardSyncer(), - tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); + streamName(), + dynamoDBClient(), + tableName(), + workerIdentifier(), + executorService(), + initialPositionInStream(), + failoverTimeMillis(), + epsilonMillis(), + maxLeasesForWorker(), + maxLeasesToStealAtOneTime(), + maxLeaseRenewalThreads(), + cleanupLeasesUponShardCompletion(), + ignoreUnexpectedChildShards(), + shardSyncIntervalMillis(), + consistentReads(), + listShardsBackoffTimeInMillis(), + maxListShardsRetryAttempts(), + maxCacheMissesBeforeReload(), + listShardsCacheAllowedAgeInSeconds(), + cacheMissWarningModulus(), + initialLeaseTableReadCapacity(), + initialLeaseTableWriteCapacity(), + hierarchicalShardSyncer(), + tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); } return leaseManagementFactory; } @@ -328,31 +331,32 @@ public class LeaseManagementConfig { public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) { if(leaseManagementFactory == null) { leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), - dynamoDBClient(), - tableName(), - workerIdentifier(), - executorService(), - failoverTimeMillis(), - epsilonMillis(), - maxLeasesForWorker(), - maxLeasesToStealAtOneTime(), - maxLeaseRenewalThreads(), - cleanupLeasesUponShardCompletion(), - ignoreUnexpectedChildShards(), - shardSyncIntervalMillis(), - consistentReads(), - listShardsBackoffTimeInMillis(), - maxListShardsRetryAttempts(), - maxCacheMissesBeforeReload(), - listShardsCacheAllowedAgeInSeconds(), - cacheMissWarningModulus(), - initialLeaseTableReadCapacity(), - initialLeaseTableWriteCapacity(), - hierarchicalShardSyncer(isMultiStreamingMode), - tableCreatorCallback(), - dynamoDbRequestTimeout(), - billingMode(), - leaseSerializer); + dynamoDBClient(), + tableName(), + workerIdentifier(), + executorService(), + failoverTimeMillis(), + epsilonMillis(), + maxLeasesForWorker(), + maxLeasesToStealAtOneTime(), + maxLeaseRenewalThreads(), + cleanupLeasesUponShardCompletion(), + ignoreUnexpectedChildShards(), + shardSyncIntervalMillis(), + consistentReads(), + listShardsBackoffTimeInMillis(), + maxListShardsRetryAttempts(), + maxCacheMissesBeforeReload(), + listShardsCacheAllowedAgeInSeconds(), + cacheMissWarningModulus(), + initialLeaseTableReadCapacity(), + initialLeaseTableWriteCapacity(), + hierarchicalShardSyncer(isMultiStreamingMode), + tableCreatorCallback(), + dynamoDbRequestTimeout(), + billingMode(), + leaseSerializer, + customShardDetectorProvider()); } return leaseManagementFactory; } @@ -366,5 +370,4 @@ public class LeaseManagementConfig { this.leaseManagementFactory = leaseManagementFactory; return this; } - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 3ba22c2b..f45c4cc2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.leases; import java.util.List; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -60,6 +61,18 @@ public interface LeaseRefresher { */ boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException; + /** + * List all leases for a given stream synchronously. + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity + * + * @return list of leases + */ + List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, + ProvisionedThroughputException; + /** * List all objects in table synchronously. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index 1b2822ee..2967a9fb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -15,12 +15,11 @@ package software.amazon.kinesis.leases; +import java.util.List; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.common.StreamIdentifier; -import java.util.List; - /** * */ @@ -34,5 +33,4 @@ public interface ShardDetector { default StreamIdentifier streamIdentifier() { throw new UnsupportedOperationException("StreamName not available"); } - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 8c09af52..44879c1c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -17,7 +17,7 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; import java.util.concurrent.ExecutorService; - +import java.util.function.Function; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -61,6 +61,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @NonNull private StreamConfig streamConfig; + private Function customShardDetectorProvider; + private final long failoverTimeMillis; private final long epsilonMillis; private final int maxLeasesForWorker; @@ -231,7 +233,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { /** * Constructor. - * + * * @param kinesisClient * @param streamName * @param dynamoDBClient @@ -365,7 +367,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param dynamoDbRequestTimeout * @param billingMode */ - public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig, + private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, @@ -382,7 +384,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer); + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer, + null); this.streamConfig = streamConfig; } @@ -425,7 +428,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, - Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) { + Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer, + Function customShardDetectorProvider) { this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; @@ -452,6 +456,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.billingMode = billingMode; this.leaseSerializer = leaseSerializer; + this.customShardDetectorProvider = customShardDetectorProvider; } @Override @@ -522,8 +527,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { */ @Override public ShardDetector createShardDetector(StreamConfig streamConfig) { - return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis, - maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, - cacheMissWarningModulus, dynamoDbRequestTimeout); + return customShardDetectorProvider != null ? customShardDetectorProvider.apply(streamConfig) : + new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, + cacheMissWarningModulus, dynamoDbRequestTimeout); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 1c464afe..c5bb1f66 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.collect.ImmutableMap; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -29,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.*; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; @@ -58,6 +60,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private boolean newTableCreated = false; + private static final String STREAM_NAME = "streamName"; + private static final String DDB_STREAM_NAME = ":streamName"; + /** * Constructor. * @@ -263,12 +268,21 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return System.currentTimeMillis() - startTime; } + /** + * {@inheritDoc} + */ + @Override + public List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, + InvalidStateException, ProvisionedThroughputException { + return list( null, streamIdentifier); + } + /** * {@inheritDoc} */ @Override public List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return list(null); + return list(null, null); } /** @@ -277,22 +291,34 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { @Override public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return list(1).isEmpty(); + return list(1, null).isEmpty(); } /** * List with the given page size. Package access for integration testing. * * @param limit number of items to consider at a time - used by integration tests to force paging. + * @param streamIdentifier streamIdentifier for multi-stream mode. Can be null. * @return list of leases * @throws InvalidStateException if table does not exist * @throws DependencyException if DynamoDB scan fail in an unexpected way * @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity */ - List list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + List list(Integer limit, StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, + ProvisionedThroughputException { + log.debug("Listing leases from table {}", table); ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(table); + + if (streamIdentifier != null) { + final Map expressionAttributeValues = ImmutableMap.of( + DDB_STREAM_NAME, AttributeValue.builder().s(streamIdentifier.serialize()).build() + ); + scanRequestBuilder = scanRequestBuilder.filterExpression(STREAM_NAME + " = " + DDB_STREAM_NAME) + .expressionAttributeValues(expressionAttributeValues); + } + if (limit != null) { scanRequestBuilder = scanRequestBuilder.limit(limit); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 52b4d014..20f5b39f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -51,6 +51,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber"; private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint"; private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber"; + private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String CHILD_SHARD_ID_KEY = "childShardId"; @@ -80,6 +81,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber())); } + if (lease.pendingCheckpointState() != null) { + result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber())); + } + return result; } @@ -111,6 +116,9 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY)) ); } + + leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); + return leaseToUpdate; } @@ -227,6 +235,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); } + if (lease.pendingCheckpointState() != null) { + result.put(PENDING_CHECKPOINT_STATE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpointState()))); + } else { + result.put(PENDING_CHECKPOINT_STATE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); + } + + if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) { result.put(CHILD_SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index e11eebfa..4108dd9b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -92,6 +92,7 @@ public class InitializeTask implements ConsumerTask { .shardId(shardInfo.shardId()) .extendedSequenceNumber(initialCheckpoint) .pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint()) + .pendingCheckpointState(initialCheckpointObject.pendingCheckpointState()) .build(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 96b17b3c..9fb822a5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -16,6 +16,8 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.function.Function; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -26,11 +28,11 @@ import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -38,8 +40,8 @@ import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java index d6c586aa..3717a805 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/InitializationInput.java @@ -47,4 +47,12 @@ public class InitializationInput { * completing the checkpoint. */ private final ExtendedSequenceNumber pendingCheckpointSequenceNumber; + + /** + * The last pending application state of the previous record processor. May be null. + * + * This will only be set if the previous record processor had prepared a checkpoint, but lost its lease before + * completing the checkpoint. + */ + private final byte[] pendingCheckpointState; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ProcessRecordsInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ProcessRecordsInput.java index 3bfcd514..1ce9239b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ProcessRecordsInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ProcessRecordsInput.java @@ -57,6 +57,7 @@ public class ProcessRecordsInput { * The records received from Kinesis. These records may have been de-aggregated if they were published by the KPL. */ private List records; + /** * A checkpointer that the {@link ShardRecordProcessor} can use to checkpoint its progress. */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java index 70cdd608..2ffadc06 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java @@ -40,7 +40,7 @@ public interface Checkpointer { /** * Get the current checkpoint stored for the specified shard. Useful for checking that the parent shard * has been completely processed before we start processing the child shard. - * + * * @param leaseKey Current checkpoint for this shard is fetched * @return Current checkpoint for this shard, null if there is no record for this shard. * @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint @@ -73,6 +73,22 @@ public interface Checkpointer { void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException; + /** + * Record intent to checkpoint for a shard. Upon failover, the pendingCheckpoint and pendingCheckpointState will be + * passed to the new ShardRecordProcessor's initialize() method. + * + * @param leaseKey Checkpoint is specified for this shard. + * @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number) + * @param concurrencyToken Used with conditional writes to prevent stale updates + * (e.g. if there was a fail over to a different record processor, we don't want to + * overwrite it's checkpoint) + * @param pendingCheckpointState Serialized application state at the pending checkpoint. + * + * @throws KinesisClientLibException Thrown if we were unable to save the checkpoint + */ + void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) + throws KinesisClientLibException; + void operation(String operation); String operation(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java index 2eb3f5c1..34b2930c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/RecordProcessorCheckpointer.java @@ -93,7 +93,6 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; - /** * This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for * aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()} @@ -145,6 +144,32 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** + * This method will record a pending checkpoint at the last data record that was delivered to the record processor. + * If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next + * IRecordProcessor for this shard will be informed of the prepared sequence number and application state. + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number and application state passed in to init() to behave idempotently. + * + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + */ + PreparedCheckpointer prepareCheckpoint(byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** * This method will record a pending checkpoint at the at the provided record. This method is analogous to * {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint. @@ -174,6 +199,38 @@ public interface RecordProcessorCheckpointer { PreparedCheckpointer prepareCheckpoint(Record record) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** + * This method will record a pending checkpoint at the at the provided record. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the record and application state at which to + * prepare the checkpoint. + * + * @param record A record at which to prepare checkpoint in this shard. + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number and application state passed in to init() to behave idempotently. + * + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException; + /** * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint. @@ -200,6 +257,35 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + /** + * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number and application state + * at which to checkpoint. + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException; + /** * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()} @@ -228,5 +314,36 @@ public interface RecordProcessorCheckpointer { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException; + /** + * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for + * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()} + * but provides the ability to specify the sequence number, subsequence number, and application state at which to + * checkpoint. + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + * @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard. + * @param applicationState arbitrary application state that will be passed to the next record processor that + * processes the shard. + * + * @return an PreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @throws ThrottlingException Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @throws ShutdownException The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this ShardRecordProcessor instance. + * @throws InvalidStateException Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @throws KinesisClientLibDependencyException Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @throws IllegalArgumentException The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, + IllegalArgumentException; + Checkpointer checkpointer(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherProviderConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherProviderConfig.java new file mode 100644 index 00000000..b5c7b23e --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherProviderConfig.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +import java.time.Duration; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.metrics.MetricsFactory; + +public interface DataFetcherProviderConfig { + + /** + * Gets stream identifier for dataFetcher. + */ + StreamIdentifier getStreamIdentifier(); + + /** + * Gets shard id. + */ + String getShardId(); + + /** + * Gets current instance of metrics factory. + */ + MetricsFactory getMetricsFactory(); + + /** + * Gets current max records allowed to process at a given time. + */ + Integer getMaxRecords(); + + /** + * Gets timeout for kinesis request. + */ + Duration getKinesisRequestTimeout(); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java index ca0487f3..3ff8e620 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java @@ -14,7 +14,9 @@ */ package software.amazon.kinesis.retrieval; +import java.util.Optional; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.kinesis.retrieval.polling.DataFetcher; import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher; /** @@ -41,15 +43,33 @@ public interface GetRecordsRetrievalStrategy { /** * Returns whether this strategy has been shutdown. - * + * * @return true if the strategy has been shutdown, false otherwise. */ boolean isShutdown(); /** - * Returns the KinesisDataFetcher used to records from Kinesis. - * - * @return KinesisDataFetcher + * Returns a DataFetcher used to records from Kinesis. + * + * @return DataFetcher */ KinesisDataFetcher getDataFetcher(); + + /** + * Returns a DataFetcher override if applicable, else empty for retrieving records from Kinesis. + * + * @return Optional + */ + default Optional getDataFetcherOverride() { + return Optional.empty(); + } + + /** + * Returns a dataFetcher by first checking for an override if it exists, else using the default data fetcher. + * + * @return DataFetcher + */ + default DataFetcher dataFetcher() { + return getDataFetcherOverride().orElse(getDataFetcher()); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcherProviderConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcherProviderConfig.java new file mode 100644 index 00000000..7cf6cdcf --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcherProviderConfig.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +import java.time.Duration; +import lombok.Data; +import lombok.NonNull; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.metrics.MetricsFactory; + + +/** + * Configuration needed for custom data fetchers + */ +@Data +public class KinesisDataFetcherProviderConfig implements DataFetcherProviderConfig { + + @NonNull + private StreamIdentifier streamIdentifier; + + @NonNull + private String shardId; + + @NonNull + private MetricsFactory metricsFactory; + + @NonNull + private Integer maxRecords; + + @NonNull + private Duration kinesisRequestTimeout; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index 746fdc19..5f22411a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -29,11 +29,15 @@ import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.retrieval.fanout.FanOutConfig; +import software.amazon.kinesis.retrieval.polling.PollingConfig; /** * Used by the KCL to configure the retrieval of records from Kinesis. */ -@Getter @Setter @ToString @EqualsAndHashCode +@Getter +@Setter +@ToString +@EqualsAndHashCode @Accessors(fluent = true) public class RetrievalConfig { /** @@ -52,6 +56,7 @@ public class RetrievalConfig { @NonNull private final String applicationName; + /** * AppStreamTracker either for multi stream tracking or single stream */ @@ -91,7 +96,7 @@ public class RetrievalConfig { private RetrievalFactory retrievalFactory; public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName, - @NonNull String applicationName) { + @NonNull String applicationName) { this.kinesisClient = kinesisAsyncClient; this.appStreamTracker = Either .right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended)); @@ -99,7 +104,7 @@ public class RetrievalConfig { } public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker, - @NonNull String applicationName) { + @NonNull String applicationName) { this.kinesisClient = kinesisAsyncClient; this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; @@ -117,17 +122,29 @@ public class RetrievalConfig { } public RetrievalFactory retrievalFactory() { - if (retrievalFactory == null) { if (retrievalSpecificConfig == null) { retrievalSpecificConfig = new FanOutConfig(kinesisClient()) .applicationName(applicationName()); retrievalSpecificConfig = appStreamTracker.map(multiStreamTracker -> retrievalSpecificConfig, - streamConfig -> ((FanOutConfig)retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName())); + streamConfig -> ((FanOutConfig) retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName())); } + retrievalFactory = retrievalSpecificConfig.retrievalFactory(); } + validateConfig(); return retrievalFactory; } + private void validateConfig() { + boolean isPollingConfig = retrievalSpecificConfig instanceof PollingConfig; + boolean isInvalidPollingConfig = isPollingConfig && appStreamTracker.map(multiStreamTracker -> + ((PollingConfig) retrievalSpecificConfig).streamName() != null, + streamConfig -> + streamConfig.streamIdentifier() == null || streamConfig.streamIdentifier().streamName() == null); + + if(isInvalidPollingConfig) { + throw new IllegalArgumentException("Invalid config: multistream enabled with streamName or single stream with no streamName"); + } + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalSpecificConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalSpecificConfig.java index 5ab982bf..30562994 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalSpecificConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalSpecificConfig.java @@ -15,10 +15,13 @@ package software.amazon.kinesis.retrieval; +import java.util.function.Function; +import software.amazon.kinesis.retrieval.polling.DataFetcher; + public interface RetrievalSpecificConfig { /** * Creates and returns a retrieval factory for the specific configuration - * + * * @return a retrieval factory that can create an appropriate retriever */ RetrievalFactory retrievalFactory(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java new file mode 100644 index 00000000..ae1c6f30 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java @@ -0,0 +1,124 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval.polling; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import lombok.NonNull; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.retrieval.DataFetcherResult; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +public interface DataFetcher { + /** + * Get records from the current position in the stream (up to maxRecords). + * + * @return list of records of up to maxRecords size + */ + DataFetcherResult getRecords(); + + /** + * Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number. + * + * @param initialCheckpoint Current checkpoint sequence number for this shard. + * @param initialPositionInStream The initialPositionInStream. + */ + void initialize(String initialCheckpoint, + InitialPositionInStreamExtended initialPositionInStream); + + /** + * Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number as an + * ExtendedSequenceNumber. + * + * @param initialCheckpoint Current checkpoint sequence number for this shard. + * @param initialPositionInStream The initialPositionInStream. + */ + void initialize(ExtendedSequenceNumber initialCheckpoint, + InitialPositionInStreamExtended initialPositionInStream); + + /** + * Advances this KinesisDataFetcher's internal iterator to be at the passed-in sequence number. + * + * @param sequenceNumber advance the iterator to the record at this sequence number. + * @param initialPositionInStream The initialPositionInStream. + */ + void advanceIteratorTo(String sequenceNumber, + InitialPositionInStreamExtended initialPositionInStream); + + /** + * Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last + * records call. + */ + void restartIterator(); + + /** + * Resets the iterator by setting shardIterator, sequenceNumber and the position in the stream. + * + * @param shardIterator set the current shard iterator. + * @param sequenceNumber reset the iterator to the record at this sequence number. + * @param initialPositionInStream the current position in the stream to reset the iterator to. + */ + void resetIterator(String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream); + + /** + * Retrieves the response based on the request. + * + * @param request the current get records request used to receive a response. + * @return GetRecordsResponse response for getRecords + */ + GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws Exception; + + /** + * Retrieves the next get records request based on the current iterator. + * + * @param nextIterator specify the iterator to get the next record request + * @return {@link GetRecordsRequest} + */ + GetRecordsRequest getGetRecordsRequest(String nextIterator); + + /** + * Gets the next iterator based on the request. + * + * @param request used to obtain the next shard iterator + * @return next iterator string + */ + String getNextIterator(GetShardIteratorRequest request) throws ExecutionException, InterruptedException, TimeoutException; + + /** + * Gets the next set of records based on the iterator. + * + * @param nextIterator specified shard iterator for getting the next set of records + * @return {@link GetRecordsResponse} + */ + GetRecordsResponse getRecords(@NonNull String nextIterator); + + /** + * Get the current account and stream information. + * + * @return {@link StreamIdentifier} + */ + StreamIdentifier getStreamIdentifier(); + + /** + * Checks if shardEnd is reached. + * @return boolean to determine whether shard end is reached + */ + boolean isShardEndReached(); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index a96e2134..82b31915 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -14,20 +14,18 @@ */ package software.amazon.kinesis.retrieval.polling; +import com.google.common.collect.Iterables; + import java.time.Duration; import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; - -import org.apache.commons.lang3.StringUtils; - -import com.google.common.collect.Iterables; - import lombok.AccessLevel; import lombok.Data; import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; @@ -47,8 +45,10 @@ import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.AWSExceptionManager; +import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.IteratorBuilder; +import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -57,7 +57,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; */ @Slf4j @KinesisClientInternalApi -public class KinesisDataFetcher { +public class KinesisDataFetcher implements DataFetcher { private static final String METRICS_PREFIX = "KinesisDataFetcher"; private static final String OPERATION = "ProcessTask"; @@ -76,33 +76,39 @@ public class KinesisDataFetcher { @Deprecated public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) { - this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT); + this(kinesisClient, new KinesisDataFetcherProviderConfig( + StreamIdentifier.singleStreamInstance(streamName), + shardId, + metricsFactory, + maxRecords, + PollingConfig.DEFAULT_REQUEST_TIMEOUT + )); } /** - * Constructs KinesisDataFetcher. - * @param kinesisClient - * @param streamIdentifier - * @param shardId - * @param maxRecords - * @param metricsFactory - * @param maxFutureWait - */ - public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) { - this.kinesisClient = kinesisClient; - this.streamIdentifier = streamIdentifier; - this.shardId = shardId; - this.maxRecords = maxRecords; - this.metricsFactory = metricsFactory; - this.maxFutureWait = maxFutureWait; - this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId; - } - - /** Note: This method has package level access for testing purposes. + * Note: This method has package level access for testing purposes. + * * @return nextIterator */ @Getter(AccessLevel.PACKAGE) private String nextIterator; + + /** + * Constructs KinesisDataFetcher. + * + * @param kinesisClient + * @param kinesisDataFetcherProviderConfig + */ + public KinesisDataFetcher(KinesisAsyncClient kinesisClient, DataFetcherProviderConfig kinesisDataFetcherProviderConfig) { + this.kinesisClient = kinesisClient; + this.maxFutureWait = kinesisDataFetcherProviderConfig.getKinesisRequestTimeout(); + this.maxRecords = kinesisDataFetcherProviderConfig.getMaxRecords(); + this.metricsFactory = kinesisDataFetcherProviderConfig.getMetricsFactory(); + this.shardId = kinesisDataFetcherProviderConfig.getShardId(); + this.streamIdentifier = kinesisDataFetcherProviderConfig.getStreamIdentifier(); + this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId; + } + @Getter private boolean isShardEndReached; private boolean isInitialized; @@ -114,6 +120,7 @@ public class KinesisDataFetcher { * * @return list of records of up to maxRecords size */ + @Override public DataFetcherResult getRecords() { if (!isInitialized) { throw new IllegalArgumentException("KinesisDataFetcher.records called before initialization."); @@ -187,6 +194,7 @@ public class KinesisDataFetcher { * @param initialCheckpoint Current checkpoint sequence number for this shard. * @param initialPositionInStream The initialPositionInStream. */ + @Override public void initialize(final String initialCheckpoint, final InitialPositionInStreamExtended initialPositionInStream) { log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint); @@ -194,6 +202,7 @@ public class KinesisDataFetcher { isInitialized = true; } + @Override public void initialize(final ExtendedSequenceNumber initialCheckpoint, final InitialPositionInStreamExtended initialPositionInStream) { log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint.sequenceNumber()); @@ -207,6 +216,7 @@ public class KinesisDataFetcher { * @param sequenceNumber advance the iterator to the record at this sequence number. * @param initialPositionInStream The initialPositionInStream. */ + @Override public void advanceIteratorTo(final String sequenceNumber, final InitialPositionInStreamExtended initialPositionInStream) { if (sequenceNumber == null) { @@ -228,9 +238,7 @@ public class KinesisDataFetcher { try { try { - final GetShardIteratorResponse result = FutureUtils - .resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait); - nextIterator = result.shardIterator(); + nextIterator = getNextIterator(request); success = true; } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); @@ -260,6 +268,7 @@ public class KinesisDataFetcher { * Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last * records call. */ + @Override public void restartIterator() { if (StringUtils.isEmpty(lastKnownSequenceNumber) || initialPositionInStream == null) { throw new IllegalStateException( @@ -268,29 +277,49 @@ public class KinesisDataFetcher { advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream); } + @Override public void resetIterator(String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { this.nextIterator = shardIterator; this.lastKnownSequenceNumber = sequenceNumber; this.initialPositionInStream = initialPositionInStream; } - private GetRecordsResponse getRecords(@NonNull final String nextIterator) { - final AWSExceptionManager exceptionManager = createExceptionManager(); - GetRecordsRequest request = KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) + @Override + public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { + final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), + maxFutureWait); + if (!isValidResponse(response)) { + throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId + + ". nextShardIterator: " + response.nextShardIterator() + + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator."); + } + return response; + } + + @Override + public GetRecordsRequest getGetRecordsRequest(String nextIterator) { + return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) .limit(maxRecords).build(); + } + + @Override + public String getNextIterator(GetShardIteratorRequest request) throws ExecutionException, InterruptedException, TimeoutException { + final GetShardIteratorResponse result = FutureUtils + .resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait); + return result.shardIterator(); + } + + @Override + public GetRecordsResponse getRecords(@NonNull final String nextIterator) { + final AWSExceptionManager exceptionManager = createExceptionManager(); + GetRecordsRequest request = getGetRecordsRequest(nextIterator); final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION); MetricsUtil.addShardId(metricsScope, shardId); - boolean success = false; + boolean success = false ; long startTime = System.currentTimeMillis(); try { - final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), - maxFutureWait); - if (!isValidResponse(response)) { - throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId - + ". nextShardIterator: " + response.nextShardIterator() - + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator."); - } + final GetRecordsResponse response = getGetRecordsResponse(request); success = true; return response; } catch (ExecutionException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index cc574506..d8c2405a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -17,31 +17,47 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Duration; import java.util.Optional; - +import java.util.function.Function; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; +import lombok.ToString; import lombok.experimental.Accessors; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RetrievalFactory; import software.amazon.kinesis.retrieval.RetrievalSpecificConfig; @Accessors(fluent = true) -@Data @Getter +@Setter +@ToString +@EqualsAndHashCode public class PollingConfig implements RetrievalSpecificConfig { public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30); + /** + * Configurable functional interface to override the existing DataFetcher. + */ + Function dataFetcherProvider; /** * Name of the Kinesis stream. * * @return String */ - @NonNull - private final String streamName; + private String streamName; + + /** + * @param kinesisClient Client used to access Kinesis services. + */ + public PollingConfig(KinesisAsyncClient kinesisClient) { + this.kinesisClient = kinesisClient; + } /** * Client used to access to Kinesis service. @@ -60,6 +76,15 @@ public class PollingConfig implements RetrievalSpecificConfig { */ private int maxRecords = 10000; + /** + * @param streamName Name of Kinesis stream. + * @param kinesisClient Client used to access Kinesis serivces. + */ + public PollingConfig(String streamName, KinesisAsyncClient kinesisClient) { + this.kinesisClient = kinesisClient; + this.streamName = streamName; + } + /** * The value for how long the ShardConsumer should sleep if no records are returned from the call to * {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. @@ -105,6 +130,6 @@ public class PollingConfig implements RetrievalSpecificConfig { @Override public RetrievalFactory retrievalFactory() { return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory, - maxRecords(), kinesisRequestTimeout); + maxRecords(), kinesisRequestTimeout, dataFetcherProvider); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 6e172f08..d9e00669 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.retrieval.polling; +import com.google.common.annotations.VisibleForTesting; + import java.time.Duration; import java.time.Instant; import java.util.List; @@ -25,21 +27,17 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; - import lombok.AccessLevel; +import lombok.Data; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; - -import com.google.common.annotations.VisibleForTesting; - -import lombok.Data; -import lombok.NonNull; -import lombok.experimental.Accessors; -import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; @@ -61,7 +59,6 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; - import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; /** @@ -108,7 +105,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { @VisibleForTesting @Getter private final LinkedBlockingQueue prefetchRecordsQueue; private final PrefetchCounters prefetchCounters; - private final KinesisDataFetcher dataFetcher; + private final DataFetcher dataFetcher; private InitialPositionInStreamExtended initialPositionInStreamExtended; private String highestSequenceNumber; @@ -215,7 +212,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; this.publisherSession = new PublisherSession(new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput), - new PrefetchCounters(), this.getRecordsRetrievalStrategy.getDataFetcher()); + new PrefetchCounters(), this.getRecordsRetrievalStrategy.dataFetcher()); this.executorService = executorService; this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory); this.idleMillisBetweenCalls = idleMillisBetweenCalls; @@ -223,7 +220,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { Validate.notEmpty(operation, "Operation cannot be empty"); this.operation = operation; this.streamAndShardId = - this.getRecordsRetrievalStrategy.getDataFetcher().getStreamIdentifier().serialize() + ":" + shardId; + this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier().serialize() + ":" + shardId; } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 73273c34..071763fc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.retrieval.polling; +import java.time.Duration; +import java.util.function.Function; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -22,13 +24,13 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; -import java.time.Duration; - /** * */ @@ -42,32 +44,71 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; @NonNull private final RecordsFetcherFactory recordsFetcherFactory; - // private final long listShardsBackoffTimeInMillis; - // private final int maxListShardsRetryAttempts; + private final int maxRecords; private final Duration kinesisRequestTimeout; - public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords, Duration kinesisRequestTimeout) { + private final Function dataFetcherProvider; + + @Deprecated + public SynchronousBlockingRetrievalFactory(String streamName, + KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, + int maxRecords, + Duration kinesisRequestTimeout) { + this(streamName, + kinesisClient, + recordsFetcherFactory, + maxRecords, + kinesisRequestTimeout, + defaultDataFetcherProvider(kinesisClient)); + } + + public SynchronousBlockingRetrievalFactory(String streamName, + KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, + int maxRecords, + Duration kinesisRequestTimeout, + Function dataFetcherProvider) { this.streamName = streamName; this.kinesisClient = kinesisClient; this.recordsFetcherFactory = recordsFetcherFactory; this.maxRecords = maxRecords; this.kinesisRequestTimeout = kinesisRequestTimeout; + this.dataFetcherProvider = dataFetcherProvider == null ? + defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider; } @Deprecated - public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords) { + public SynchronousBlockingRetrievalFactory(String streamName, + KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, + int maxRecords) { this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT); } + private static Function defaultDataFetcherProvider( + KinesisAsyncClient kinesisClient) { + return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig); + } + @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, - @NonNull final MetricsFactory metricsFactory) { + @NonNull final MetricsFactory metricsFactory) { final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : StreamIdentifier.singleStreamInstance(streamName); - return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); + + final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig( + streamIdentifier, + shardInfo.shardId(), + metricsFactory, + maxRecords, + kinesisRequestTimeout); + + final DataFetcher dataFetcher = this.dataFetcherProvider.apply(kinesisDataFetcherProviderConfig); + + return new SynchronousGetRecordsRetrievalStrategy(dataFetcher); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java index c6fa619b..7f3b54d5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.retrieval.polling; +import java.util.Optional; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -26,8 +27,9 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @Data @KinesisClientInternalApi public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy { + @NonNull - private final KinesisDataFetcher dataFetcher; + private final DataFetcher dataFetcher; @Override public GetRecordsResponse getRecords(final int maxRecords) { @@ -45,9 +47,14 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev public boolean isShutdown() { return false; } - + @Override public KinesisDataFetcher getDataFetcher() { + throw new UnsupportedOperationException("Deprecated. Use dataFetcher() to retrieve a dataFetcher"); + } + + @Override + public DataFetcher dataFetcher() { return dataFetcher; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index 4a8c5250..efa11e70 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -17,7 +17,6 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Duration; import java.util.concurrent.ExecutorService; - import lombok.NonNull; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -25,6 +24,7 @@ import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; @@ -71,9 +71,15 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : StreamIdentifier.singleStreamInstance(streamName); + return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), - maxRecords, metricsFactory, maxFutureWait)); + new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig( + streamIdentifier, + shardInfo.shardId(), + metricsFactory, + maxRecords, + maxFutureWait + ))); } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java index 1cf77a3d..b823c8e3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java @@ -90,6 +90,26 @@ public class CheckpointerTest { Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); } + + @Test + public final void testInitialPrepareCheckpointWithApplicationState() throws Exception { + String sequenceNumber = "1"; + String pendingCheckpointValue = "99999"; + String shardId = "myShardId"; + byte[] applicationState = "applicationState".getBytes(); + ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(sequenceNumber); + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken); + + ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue); + checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), testConcurrencyToken, + applicationState); + + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + } + @Test public final void testAdvancingPrepareCheckpoint() throws Exception { String shardId = "myShardId"; @@ -107,6 +127,26 @@ public class CheckpointerTest { } } + @Test + public final void testAdvancingPrepareCheckpointWithApplicationState() throws Exception { + String shardId = "myShardId"; + String checkpointValue = "12345"; + byte[] applicationState = "applicationState".getBytes(); + ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue); + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), testConcurrencyToken); + + for (Integer i = 0; i < 10; i++) { + String sequenceNumber = i.toString(); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber); + checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken, + applicationState); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + } + } + @Test public final void testPrepareAndSetCheckpoint() throws Exception { String checkpointValue = "12345"; @@ -134,4 +174,35 @@ public class CheckpointerTest { Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); } + + @Test + public final void testPrepareAndSetCheckpointWithApplicationState() throws Exception { + String checkpointValue = "12345"; + String shardId = "testShardId-1"; + String concurrencyToken = "token-1"; + String pendingCheckpointValue = "99999"; + byte[] applicationState = "applicationState".getBytes(); + + // set initial checkpoint + ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue); + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), concurrencyToken); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + + // prepare checkpoint + ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue); + checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken, applicationState); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + + // do checkpoint + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index ebe933b9..8f6e165d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -18,7 +18,6 @@ import java.util.HashMap; import java.util.Map; import software.amazon.kinesis.exceptions.KinesisClientLibException; -import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -32,6 +31,7 @@ public class InMemoryCheckpointer implements Checkpointer { private Map checkpoints = new HashMap<>(); private Map flushpoints = new HashMap<>(); private Map pendingCheckpoints = new HashMap<>(); + private Map pendingCheckpointStates = new HashMap<>(); private String operation; @@ -44,6 +44,7 @@ public class InMemoryCheckpointer implements Checkpointer { checkpoints.put(leaseKey, checkpointValue); flushpoints.put(leaseKey, checkpointValue); pendingCheckpoints.remove(leaseKey); + pendingCheckpointStates.remove(leaseKey); if (log.isDebugEnabled()) { log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue); @@ -64,15 +65,22 @@ public class InMemoryCheckpointer implements Checkpointer { @Override public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException { + prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null); + } + + @Override + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { pendingCheckpoints.put(leaseKey, pendingCheckpoint); + pendingCheckpointStates.put(leaseKey, pendingCheckpointState); } @Override public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException { ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey); + byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey); - Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint); + Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint, pendingCheckpointState); log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj); return checkpointObj; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index fd6b531b..1de7b101 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -242,7 +242,7 @@ public class SchedulerTest { final List secondShardInfo = Collections.singletonList( new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber)); - final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null); + final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null); when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint); @@ -368,7 +368,7 @@ public class SchedulerTest { .map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber, sc.streamIdentifier().serialize())).collect(Collectors.toList()); - final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null); + final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null); when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index 233ce724..d9d7d01e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -19,6 +19,7 @@ import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -54,6 +55,7 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { DELETELEASE(9), DELETEALL(10), UPDATELEASE(11), + LISTLEASESFORSTREAM(12), NONE(Integer.MIN_VALUE); private Integer index; @@ -129,6 +131,13 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { return leaseRefresher.waitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds); } + @Override + public List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + throwExceptions("listLeasesForStream", ExceptionThrowingLeaseRefresherMethods.LISTLEASESFORSTREAM); + + return leaseRefresher.listLeasesForStream(streamIdentifier); + } + @Override public List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index e1dfc52a..44f6acf4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -969,7 +969,7 @@ public class HierarchicalShardSyncerTest { parentShardIds.add(shard.adjacentParentShardId()); } return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L, - parentShardIds); + parentShardIds, null); }).collect(Collectors.toList()); } @@ -1039,7 +1039,7 @@ public class HierarchicalShardSyncerTest { final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); when(shardDetector.listShards()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases); + when(dynamoDBLeaseRefresher.listLeasesForStream(any(StreamIdentifier.class))).thenReturn(leases); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); setupMultiStream(); hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, @@ -1049,7 +1049,7 @@ public class HierarchicalShardSyncerTest { assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); verify(shardDetector, times(2)).listShards(); - verify(dynamoDBLeaseRefresher).listLeases(); + verify(dynamoDBLeaseRefresher).listLeasesForStream(any(StreamIdentifier.class)); verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java index 8ab99a18..cf06f586 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java @@ -35,9 +35,10 @@ public class LeaseBuilder { private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); private Set childShardIds = new HashSet<>(); + private byte[] pendingCheckpointState; public Lease build() { - return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, - checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds); + return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, + pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, pendingCheckpointState); } } \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index 3af33c69..d89c010e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -127,16 +127,37 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { } assertNotNull(lease); - ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber("newCheckpoint"); + final ExtendedSequenceNumber initialCheckpoint = new ExtendedSequenceNumber("initialCheckpoint"); + final ExtendedSequenceNumber pendingCheckpoint = new ExtendedSequenceNumber("pendingCheckpoint"); + final ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber("newCheckpoint"); + final byte[] checkpointState = "checkpointState".getBytes(); + // lease's leaseCounter is wrong at this point, but it shouldn't matter. + assertTrue(dynamoDBCheckpointer.setCheckpoint(lease.leaseKey(), initialCheckpoint, lease.concurrencyToken())); + + final Lease leaseFromDDBAtInitialCheckpoint = leaseRefresher.getLease(lease.leaseKey()); + lease.leaseCounter(lease.leaseCounter() + 1); + lease.checkpoint(initialCheckpoint); + lease.leaseOwner(coordinator.workerIdentifier()); + assertEquals(lease, leaseFromDDBAtInitialCheckpoint); + + dynamoDBCheckpointer.prepareCheckpoint(lease.leaseKey(), pendingCheckpoint, lease.concurrencyToken().toString(), checkpointState); + + final Lease leaseFromDDBAtPendingCheckpoint = leaseRefresher.getLease(lease.leaseKey()); + lease.leaseCounter(lease.leaseCounter() + 1); + lease.checkpoint(initialCheckpoint); + lease.pendingCheckpoint(pendingCheckpoint); + lease.pendingCheckpointState(checkpointState); + assertEquals(lease, leaseFromDDBAtPendingCheckpoint); + assertTrue(dynamoDBCheckpointer.setCheckpoint(lease.leaseKey(), newCheckpoint, lease.concurrencyToken())); - Lease fromDynamo = leaseRefresher.getLease(lease.leaseKey()); - + final Lease leaseFromDDBAtNewCheckpoint = leaseRefresher.getLease(lease.leaseKey()); lease.leaseCounter(lease.leaseCounter() + 1); lease.checkpoint(newCheckpoint); - lease.leaseOwner(coordinator.workerIdentifier()); - assertEquals(lease, fromDynamo); + lease.pendingCheckpoint(null); + lease.pendingCheckpointState(null); + assertEquals(lease, leaseFromDDBAtNewCheckpoint); } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java index 99dcd64d..75431866 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java @@ -72,7 +72,7 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest Collection expected = builder.build().values(); // The / 3 here ensures that we will test Dynamo's paging mechanics. - List actual = leaseRefresher.list(numRecordsToPut / 3); + List actual = leaseRefresher.list(numRecordsToPut / 3, null); for (Lease lease : actual) { assertNotNull(expected.remove(lease)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 61cba722..f22e6e4d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -55,7 +55,7 @@ public class DynamoDBLeaseRenewerTest { private LeaseRefresher leaseRefresher; private static Lease newLease(String leaseKey) { - return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>()); + return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), null); } @Before diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index f7051ec4..281d738c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -107,7 +107,7 @@ public class PrefetchRecordsPublisherTest { @Mock private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @Mock - private KinesisDataFetcher dataFetcher; + private DataFetcher dataFetcher; @Mock private InitialPositionInStreamExtended initialPosition; @Mock @@ -124,7 +124,7 @@ public class PrefetchRecordsPublisherTest { @Before public void setup() { - when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher); + when(getRecordsRetrievalStrategy.dataFetcher()).thenReturn(dataFetcher); when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("testStream")); executorService = spy(Executors.newFixedThreadPool(1)); getRecordsCache = new PrefetchRecordsPublisher( diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java index d6d8b6d5..ddc25e21 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java @@ -40,14 +40,14 @@ public class RecordsFetcherFactoryTest { @Mock private MetricsFactory metricsFactory; @Mock - private KinesisDataFetcher kinesisDataFetcher; + private DataFetcher dataFetcher; @Before public void setUp() { MockitoAnnotations.initMocks(this); recordsFetcherFactory = new SimpleRecordsFetcherFactory(); - when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(kinesisDataFetcher); - when(kinesisDataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); + when(getRecordsRetrievalStrategy.dataFetcher()).thenReturn(dataFetcher); + when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); } @Test @@ -66,5 +66,4 @@ public class RecordsFetcherFactoryTest { metricsFactory, 1); assertThat(recordsCache, instanceOf(PrefetchRecordsPublisher.class)); } - }