From 2e113dbd6ca0573d80d01579c33d5c71ba876816 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 13 Mar 2020 01:36:33 -0700 Subject: [PATCH] Added Unit Test cases, code comments and other code refactoring --- .../kinesis/common/StreamIdentifier.java | 62 ++-- .../amazon/kinesis/coordinator/Scheduler.java | 14 +- .../leases/HierarchicalShardSyncer.java | 10 +- .../kinesis/leases/KinesisShardDetector.java | 3 +- .../kinesis/leases/LeaseManagementConfig.java | 25 +- .../amazon/kinesis/leases/ShardInfo.java | 27 +- .../dynamodb/DynamoDBLeaseCoordinator.java | 6 +- .../DynamoDBLeaseManagementFactory.java | 47 ++- .../kinesis/processor/MultiStreamTracker.java | 4 +- .../ShardRecordProcessorFactory.java | 2 +- .../kinesis/retrieval/RetrievalConfig.java | 2 +- .../retrieval/fanout/FanOutConfig.java | 1 + .../fanout/FanOutRetrievalFactory.java | 4 +- .../retrieval/polling/KinesisDataFetcher.java | 12 +- .../SynchronousBlockingRetrievalFactory.java | 6 +- ...ynchronousPrefetchingRetrievalFactory.java | 6 +- .../kinesis/coordinator/SchedulerTest.java | 130 ++++++++- .../leases/HierarchicalShardSyncerTest.java | 269 ++++++++++++++++++ .../kinesis/lifecycle/ConsumerStatesTest.java | 4 +- .../retrieval/fanout/FanOutConfigTest.java | 10 +- 20 files changed, 562 insertions(+), 82 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java index 2435b5bf..0bdce7fb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -3,41 +3,67 @@ package software.amazon.kinesis.common; import com.google.common.base.Joiner; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; import software.amazon.awssdk.utils.Validate; +import java.util.Optional; import java.util.regex.Pattern; -@RequiredArgsConstructor -@EqualsAndHashCode -@Getter -@Accessors(fluent = true) +@EqualsAndHashCode @Getter @Accessors(fluent = true) public class StreamIdentifier { - private final String accountName; + private final Optional accountIdOptional; private final String streamName; - private final Long streamCreationEpoch; + private final Optional streamCreationEpochOptional; - private static final String DEFAULT_ACCOUNT = "default"; private static final String DELIMITER = ":"; private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*"); - @Override - public String toString(){ - return Joiner.on(DELIMITER).join(accountName, streamName, streamCreationEpoch); + private StreamIdentifier(Optional accountIdOptional, String streamName, + Optional streamCreationEpochOptional) { + Validate.isTrue((accountIdOptional.isPresent() && streamCreationEpochOptional.isPresent()) || + (!accountIdOptional.isPresent() && !streamCreationEpochOptional.isPresent()), + "AccountId and StreamCreationEpoch must either be present together or not"); + this.accountIdOptional = accountIdOptional; + this.streamName = streamName; + this.streamCreationEpochOptional = streamCreationEpochOptional; } - public static StreamIdentifier fromString(String streamIdentifier) { - if (PATTERN.matcher(streamIdentifier).matches()) { - final String[] split = streamIdentifier.split(DELIMITER); - return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); + /** + * Serialize the current StreamIdentifier instance. + * @return + */ + public String serialize() { + return accountIdOptional.isPresent() ? + Joiner.on(DELIMITER).join(accountIdOptional.get(), streamName, streamCreationEpochOptional.get()) : + streamName; + } + + @Override + public String toString() { + return serialize(); + } + + /** + * Create a multi stream instance for StreamIdentifier from serialized stream identifier. + * @param streamIdentifierSer + * @return StreamIdentifier + */ + public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { + if (PATTERN.matcher(streamIdentifierSer).matches()) { + final String[] split = streamIdentifierSer.split(DELIMITER); + return new StreamIdentifier(Optional.of(split[0]), split[1], Optional.of(Long.parseLong(split[2]))); } else { - throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifier); + throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } } - public static StreamIdentifier fromStreamName(String streamName) { + /** + * Create a single stream instance for StreamIdentifier from stream name. + * @param streamName + * @return StreamIdentifier + */ + public static StreamIdentifier singleStreamInstance(String streamName) { Validate.notEmpty(streamName, "StreamName should not be empty"); - return new StreamIdentifier(DEFAULT_ACCOUNT, streamName, 0L); + return new StreamIdentifier(Optional.empty(), streamName, Optional.empty()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 1e95f2e6..48467dfb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -42,7 +42,6 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; @@ -295,9 +294,10 @@ public class Scheduler implements Runnable { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: for already synced streams - for(StreamIdentifier streamIdentifier : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) { - log.info("Syncing Kinesis shard info"); - final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); + for(Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); + log.info("Syncing Kinesis shard info for " + streamIdentifier); + final StreamConfig streamConfig = streamConfigEntry.getValue(); ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier), leaseRefresher, streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, @@ -363,7 +363,7 @@ public class Scheduler implements Runnable { } for (ShardInfo completedShard : completedShards) { - final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifier()); + final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) { log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString()); } @@ -635,7 +635,7 @@ public class Scheduler implements Runnable { checkpoint); // The only case where streamName is not available will be when multistreamtracker not set. In this case, // get the default stream name for the single stream application. - final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifier()); + final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt()); // Irrespective of single stream app or multi stream app, streamConfig should always be available. final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); Validate.notNull(streamConfig, "StreamConfig should not be empty"); @@ -712,7 +712,7 @@ public class Scheduler implements Runnable { private StreamIdentifier getStreamIdentifier(Optional streamIdentifierString) { final StreamIdentifier streamIdentifier; if(streamIdentifierString.isPresent()) { - streamIdentifier = StreamIdentifier.fromString(streamIdentifierString.get()); + streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get()); } else { Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 1102574f..f4143581 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -29,6 +29,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import lombok.Data; import lombok.experimental.Accessors; import org.apache.commons.lang3.StringUtils; @@ -162,7 +163,7 @@ public class HierarchicalShardSyncer { throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamLeases = new ArrayList<>(); for (Lease lease : leaseRefresher.listLeases()) { - if (streamIdentifier.toString().equals(((MultiStreamLease)lease).streamIdentifier())) { + if (streamIdentifier.serialize().equals(((MultiStreamLease)lease).streamIdentifier())) { streamLeases.add(lease); } } @@ -761,7 +762,7 @@ public class HierarchicalShardSyncer { private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) { MultiStreamLease newLease = new MultiStreamLease(); - newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.toString(), shard.shardId())); + newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), shard.shardId())); List parentShardIds = new ArrayList<>(2); if (shard.parentShardId() != null) { parentShardIds.add(shard.parentShardId()); @@ -771,7 +772,7 @@ public class HierarchicalShardSyncer { } newLease.parentShardIds(parentShardIds); newLease.ownerSwitchesSinceCheckpoint(0L); - newLease.streamIdentifier(streamIdentifier.toString()); + newLease.streamIdentifier(streamIdentifier.serialize()); newLease.shardId(shard.shardId()); return newLease; } @@ -857,7 +858,8 @@ public class HierarchicalShardSyncer { @Data @Accessors(fluent = true) - private static class MultiStreamArgs { + @VisibleForTesting + static class MultiStreamArgs { private final Boolean isMultiStreamMode; private final StreamIdentifier streamIdentifier; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index 53751375..0c495558 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -78,7 +77,7 @@ public class KinesisShardDetector implements ShardDetector { public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, int cacheMissWarningModulus) { - this(kinesisClient, StreamIdentifier.fromStreamName(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, + this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 3361e3fb..2a5a0b1e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -273,6 +273,12 @@ public class LeaseManagementConfig { return hierarchicalShardSyncer; } + /** + * Vends HierarchicalShardSyncer based on MultiStreamingMode. With MultiStreamMode shard syncer creates + * leases to accommodate more than one stream. + * @param isMultiStreamingMode + * @return HierarchicalShardSyncer + */ public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) { if(hierarchicalShardSyncer == null) { hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode); @@ -313,6 +319,12 @@ public class LeaseManagementConfig { return leaseManagementFactory; } + /** + * Vends LeaseManagementFactory that performs serde based on leaseSerializer and shard sync based on isMultiStreamingMode + * @param leaseSerializer + * @param isMultiStreamingMode + * @return LeaseManagementFactory + */ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) { if(leaseManagementFactory == null) { leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), @@ -345,17 +357,14 @@ public class LeaseManagementConfig { return leaseManagementFactory; } + /** + * Set leaseManagementFactory and return the current LeaseManagementConfig instance. + * @param leaseManagementFactory + * @return LeaseManagementConfig + */ public LeaseManagementConfig leaseManagementFactory(final LeaseManagementFactory leaseManagementFactory) { this.leaseManagementFactory = leaseManagementFactory; return this; } -// private InitialPositionInStreamExtended getInitialPositionExtendedForStream(String streamName) { -// return multiStreamTracker() == null ? -// initialPositionInStream() : -// multiStreamTracker().initialPositionInStreamExtended(streamName) == null ? -// initialPositionInStream() : -// multiStreamTracker().initialPositionInStreamExtended(streamName); -// } - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java index 36bc5dd1..c4b2968c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java @@ -37,7 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @ToString public class ShardInfo { - private final Optional streamIdentifier; + private final Optional streamIdentifierSerOpt; private final String shardId; private final String concurrencyToken; // Sorted list of parent shardIds. @@ -63,11 +63,20 @@ public class ShardInfo { this(shardId, concurrencyToken, parentShardIds, checkpoint, null); } + /** + * Creates a new ShardInfo object that has an option to pass a serialized streamIdentifier. + * The checkpoint is not part of the equality, but is used for debugging output. + * @param shardId + * @param concurrencyToken + * @param parentShardIds + * @param checkpoint + * @param streamIdentifierSer + */ public ShardInfo(@NonNull final String shardId, final String concurrencyToken, final Collection parentShardIds, final ExtendedSequenceNumber checkpoint, - final String streamIdentifier) { + final String streamIdentifierSer) { this.shardId = shardId; this.concurrencyToken = concurrencyToken; this.parentShardIds = new LinkedList<>(); @@ -78,7 +87,7 @@ public class ShardInfo { // This makes it easy to check for equality in ShardInfo.equals method. Collections.sort(this.parentShardIds); this.checkpoint = checkpoint; - this.streamIdentifier = Optional.ofNullable(streamIdentifier); + this.streamIdentifierSerOpt = Optional.ofNullable(streamIdentifierSer); } /** @@ -105,7 +114,7 @@ public class ShardInfo { @Override public int hashCode() { return new HashCodeBuilder() - .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifier.orElse("")).toHashCode(); + .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierSerOpt.orElse("")).toHashCode(); } /** @@ -130,18 +139,18 @@ public class ShardInfo { ShardInfo other = (ShardInfo) obj; return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) - .append(streamIdentifier.orElse(""), other.streamIdentifier.orElse("")).isEquals(); + .append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals(); } /** - * + * Utility method to derive lease key from ShardInfo * @param shardInfo - * @return + * @return lease key */ public static String getLeaseKey(ShardInfo shardInfo) { - return shardInfo.streamIdentifier().isPresent() ? - MultiStreamLease.getLeaseKey(shardInfo.streamIdentifier().get(), shardInfo.shardId()) : + return shardInfo.streamIdentifierSerOpt().isPresent() ? + MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardInfo.shardId()) : shardInfo.shardId(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index f8a11c0a..3b7057de 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -378,7 +378,11 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList()); } - // TODO : Halo : Check for better way + /** + * Utility method to convert the basic lease or multistream lease to ShardInfo + * @param lease + * @return ShardInfo + */ public static ShardInfo convertLeaseToAssignment(final Lease lease) { if (lease instanceof MultiStreamLease) { return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index e383a41b..8c09af52 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -16,9 +16,6 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ExecutorService; import lombok.Data; @@ -26,8 +23,6 @@ import lombok.NonNull; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.utils.CollectionUtils; -import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; @@ -331,7 +326,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode) { - this(kinesisClient, new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStream), dynamoDBClient, tableName, + this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName, workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, @@ -391,6 +386,35 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.streamConfig = streamConfig; } + /** + * Constructor. + * @param kinesisClient + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param hierarchicalShardSyncer + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + * @param leaseSerializer + */ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, @@ -457,6 +481,12 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { metricsFactory); } + /** + * Create ShardSyncTaskManager from the streamConfig passed + * @param metricsFactory + * @param streamConfig + * @return ShardSyncTaskManager + */ @Override public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { return new ShardSyncTaskManager(this.createShardDetector(streamConfig), @@ -476,7 +506,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { tableCreatorCallback, dynamoDbRequestTimeout, billingMode); } - @Override @Deprecated + @Override + @Deprecated public ShardDetector createShardDetector() { return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, @@ -487,7 +518,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * KinesisShardDetector supports reading from service only using streamName. Support for accountId and * stream creation epoch is yet to be provided. * @param streamConfig - * @return + * @return ShardDetector */ @Override public ShardDetector createShardDetector(StreamConfig streamConfig) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java index e41f7e08..171687bc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java @@ -12,9 +12,9 @@ import java.util.Map; public interface MultiStreamTracker { /** - * Returns the map of streams and its associated stream specific config. + * Returns the list of stream config, to be processed by the current application. * - * @return List of stream names + * @return List of StreamConfig */ List streamConfigList(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java index e16695b2..72a6d66f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java @@ -29,7 +29,7 @@ public interface ShardRecordProcessorFactory { ShardRecordProcessor shardRecordProcessor(); /** - * Returns a new instance of the ShardRecordProcessor for a stream + * Returns a new instance of the ShardRecordProcessor for a stream identifier * @param streamIdentifier * @return ShardRecordProcessor */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index f26c0e13..98046b6b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -94,7 +94,7 @@ public class RetrievalConfig { @NonNull String applicationName) { this.kinesisClient = kinesisAsyncClient; this.appStreamTracker = Either - .right(new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStreamExtended)); + .right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended)); this.applicationName = applicationName; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java index 45679089..cbbcb483 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java @@ -85,6 +85,7 @@ public class FanOutConfig implements RetrievalSpecificConfig { return new FanOutRetrievalFactory(kinesisClient, streamName, this::getOrCreateConsumerArn); } + // TODO : Halo. Need Stream Specific ConsumerArn to be passed from Customer private String getOrCreateConsumerArn(String streamName) { if (consumerArn != null) { return consumerArn; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index f655d8fd..f609c1d9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -49,10 +49,10 @@ public class FanOutRetrievalFactory implements RetrievalFactory { @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, final MetricsFactory metricsFactory) { - final Optional streamIdentifierStr = shardInfo.streamIdentifier(); + final Optional streamIdentifierStr = shardInfo.streamIdentifierSerOpt(); final String streamName; if(streamIdentifierStr.isPresent()) { - streamName = StreamIdentifier.fromString(streamIdentifierStr.get()).streamName(); + streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName(); } else { streamName = defaultStreamName; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index db6d2aae..1ea833a3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -74,10 +74,18 @@ public class KinesisDataFetcher { @Deprecated public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) { - this(kinesisClient, StreamIdentifier.fromStreamName(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT); + this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT); } - // Changing the constructor directly as this is an internal API + /** + * Constructs KinesisDataFetcher. + * @param kinesisClient + * @param streamIdentifier + * @param shardId + * @param maxRecords + * @param metricsFactory + * @param maxFutureWait + */ public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) { this.kinesisClient = kinesisClient; this.streamIdentifier = streamIdentifier; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 93718f7c..73273c34 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -63,9 +63,9 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { - final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ? - StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) : - StreamIdentifier.fromStreamName(streamName); + final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? + StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : + StreamIdentifier.singleStreamInstance(streamName); return new SynchronousGetRecordsRetrievalStrategy( new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index e73ad928..4a8c5250 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -68,9 +68,9 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { - final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ? - StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) : - StreamIdentifier.fromStreamName(streamName); + final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? + StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : + StreamIdentifier.singleStreamInstance(streamName); return new SynchronousGetRecordsRetrievalStrategy( new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, maxFutureWait)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 7cfc86a5..bd8c28e8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -22,8 +22,10 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -32,14 +34,20 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.atMost; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.RejectedExecutionException; +import java.util.stream.Collectors; import io.reactivex.plugins.RxJavaPlugins; +import lombok.RequiredArgsConstructor; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -52,8 +60,11 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -63,6 +74,8 @@ import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.events.InitializationInput; @@ -73,6 +86,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.Checkpointer; +import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessor; @@ -124,6 +138,11 @@ public class SchedulerTest { private Checkpointer checkpoint; @Mock private WorkerStateChangeListener workerStateChangeListener; + @Mock + private MultiStreamTracker multiStreamTracker; + + private Map shardSyncTaskManagerMap = new HashMap<>(); + private Map shardDetectorMap = new HashMap<>(); @Before public void setup() { @@ -132,13 +151,25 @@ public class SchedulerTest { checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory()); coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener); leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, - workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory()); + workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, false)); lifecycleConfig = new LifecycleConfig(); metricsConfig = new MetricsConfig(cloudWatchClient, namespace); processorConfig = new ProcessorConfig(shardRecordProcessorFactory); retrievalConfig = new RetrievalConfig(kinesisClient, streamName, applicationName) .retrievalFactory(retrievalFactory); + final List streamConfigList = new ArrayList() {{ + add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.LATEST))); + add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream2:2"), InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.LATEST))); + add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.LATEST))); + add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream2:3"), InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.LATEST))); + }}; + + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); @@ -245,7 +276,10 @@ public class SchedulerTest { public final void testInitializationFailureWithRetries() throws Exception { doNothing().when(leaseCoordinator).initialize(); when(shardDetector.listShards()).thenThrow(new RuntimeException()); - + leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, + workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true)); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); scheduler.run(); verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards(); @@ -255,6 +289,8 @@ public class SchedulerTest { public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception { final int maxInitializationAttempts = 5; coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts); + leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, + workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); @@ -267,6 +303,76 @@ public class SchedulerTest { verify(shardDetector, times(maxInitializationAttempts)).listShards(); } + @Test + public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException { + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + scheduler.initialize(); + shardDetectorMap.values().stream() + .forEach(shardDetector -> verify(shardDetector, times(1)).listShards()); + } + + @Test + public final void testMultiStreamInitializationWithFailures() { + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, + workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, false)); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + scheduler.initialize(); + // Note : As of today we retry for all streams in the next attempt. Hence the retry for each stream will vary. + // At the least we expect 2 retries for each stream. Since there are 4 streams, we expect at most + // the number of calls to be 5. + shardDetectorMap.values().stream() + .forEach(shardDetector -> verify(shardDetector, atLeast(2)).listShards()); + shardDetectorMap.values().stream() + .forEach(shardDetector -> verify(shardDetector, atMost(5)).listShards()); + } + + + @Test + public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() throws KinesisClientLibException { + final String shardId = "shardId-000000000000"; + final String concurrencyToken = "concurrencyToken"; + final ExtendedSequenceNumber firstSequenceNumber = ExtendedSequenceNumber.TRIM_HORIZON; + final ExtendedSequenceNumber secondSequenceNumber = new ExtendedSequenceNumber("1000"); + final ExtendedSequenceNumber finalSequenceNumber = new ExtendedSequenceNumber("2000"); + + final List initialShardInfo = multiStreamTracker.streamConfigList().stream() + .map(sc -> new ShardInfo(shardId, concurrencyToken, null, firstSequenceNumber, + sc.streamIdentifier().serialize())).collect(Collectors.toList()); + final List firstShardInfo = multiStreamTracker.streamConfigList().stream() + .map(sc -> new ShardInfo(shardId, concurrencyToken, null, secondSequenceNumber, + sc.streamIdentifier().serialize())).collect(Collectors.toList()); + final List secondShardInfo = multiStreamTracker.streamConfigList().stream() + .map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber, + sc.streamIdentifier().serialize())).collect(Collectors.toList()); + + final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null); + + when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); + when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + Scheduler schedulerSpy = spy(scheduler); + schedulerSpy.runProcessLoop(); + schedulerSpy.runProcessLoop(); + schedulerSpy.runProcessLoop(); + + initialShardInfo.stream().forEach( + shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory))); + firstShardInfo.stream().forEach( + shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory))); + secondShardInfo.stream().forEach( + shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory))); + + } + @Test public final void testSchedulerShutdown() { scheduler.shutdown(); @@ -508,7 +614,12 @@ public class SchedulerTest { } + @RequiredArgsConstructor private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory { + + private final boolean shardSyncFirstAttemptFailure; + private final boolean shouldReturnDefaultShardSyncTaskmanager; + @Override public LeaseCoordinator createLeaseCoordinator(MetricsFactory metricsFactory) { return leaseCoordinator; @@ -522,6 +633,19 @@ public class SchedulerTest { @Override public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { + if(shouldReturnDefaultShardSyncTaskmanager) { + return shardSyncTaskManager; + } + final ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class); + final ShardDetector shardDetector = mock(ShardDetector.class); + shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager); + shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); + when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + if(shardSyncFirstAttemptFailure) { + when(shardDetector.listShards()) + .thenThrow(new RuntimeException("Service Exception")) + .thenReturn(Collections.EMPTY_LIST); + } return shardSyncTaskManager; } @@ -537,7 +661,7 @@ public class SchedulerTest { @Override public ShardDetector createShardDetector(StreamConfig streamConfig) { - return shardDetector; + return shardDetectorMap.get(streamConfig.streamIdentifier()); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 4dc3fdd3..0cc50c2a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -55,6 +55,7 @@ import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -74,6 +75,10 @@ public class HierarchicalShardSyncerTest { private static final int EXPONENT = 128; private static final String LEASE_OWNER = "TestOwnere"; private static final MetricsScope SCOPE = new NullMetricsScope(); + private static final boolean MULTISTREAM_MODE_ON = true; + private static final String STREAM_IDENTIFIER = "acc:stream:1"; + private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs( + MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)); private final boolean cleanupLeasesOfCompletedShards = true; private final boolean ignoreUnexpectedChildShards = false; @@ -95,6 +100,11 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer = new HierarchicalShardSyncer(); } + private void setupMultiStream() { + hierarchicalShardSyncer = new HierarchicalShardSyncer(true); + when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)); + } + /** * Test determineNewLeasesToCreate() where there are no shards */ @@ -107,6 +117,18 @@ public class HierarchicalShardSyncerTest { equalTo(true)); } + /** + * Test determineNewLeasesToCreate() where there are no shards for MultiStream + */ + @Test public void testDetermineNewLeasesToCreateNoShardsForMultiStream() { + final List shards = Collections.emptyList(); + final List leases = Collections.emptyList(); + + assertThat(HierarchicalShardSyncer + .determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS) + .isEmpty(), equalTo(true)); + } + /** * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed */ @@ -129,6 +151,29 @@ public class HierarchicalShardSyncerTest { assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); } + /** + * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed + */ + @Test + public void testDetermineNewLeasesToCreate0Leases0ReshardsForMultiStream() { + final String shardId0 = "shardId-0"; + final String shardId1 = "shardId-1"; + final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); + + final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), + ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + final List currentLeases = Collections.emptyList(); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); + final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set expectedLeaseIds = new HashSet<>( + toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); + + assertThat(newLeases.size(), equalTo(expectedLeaseIds.size())); + assertThat(newLeaseKeys, equalTo(expectedLeaseIds)); + } + /** * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but * one of the shards was marked as inconsistent. @@ -155,6 +200,33 @@ public class HierarchicalShardSyncerTest { assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); } + /** + * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but + * one of the shards was marked as inconsistent. + */ + @Test + public void testDetermineNewLeasesToCreate0Leases0Reshards1InconsistentMultiStream() { + final String shardId0 = "shardId-0"; + final String shardId1 = "shardId-1"; + final String shardId2 = "shardId-2"; + final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); + + final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), + ShardObjectHelper.newShard(shardId1, null, null, sequenceRange), + ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + final List currentLeases = Collections.emptyList(); + + final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); + final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set expectedLeaseShardIds = new HashSet<>( + toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); + assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); + assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); + } + /** * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) */ @@ -208,6 +280,45 @@ public class HierarchicalShardSyncerTest { } + @Test + public void testCheckAndCreateLeasesForShardsIfMissingAtLatestMultiStream() throws Exception { + final List shards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + setupMultiStream(); + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE); + + final Set expectedShardIds = new HashSet<>( + toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(requestLeaseKeys, equalTo(expectedShardIds)); + assertThat(extendedSequenceNumbers.size(), equalTo(1)); + + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); + + verify(shardDetector).listShards(); + verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + + } + + private List toMultiStreamLeaseList(List shardIdBasedLeases) { + return shardIdBasedLeases.stream().map(s -> STREAM_IDENTIFIER + ":" + s) + .collect(Collectors.toList()); + } + /** * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards() * should never be called. @@ -244,6 +355,42 @@ public class HierarchicalShardSyncerTest { verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } + /** + * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards() + * should never be called. + */ + @Test + public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws Exception { + final List latestShards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(shardDetector.listShards()).thenReturn(latestShards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + setupMultiStream(); + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); + + final Set expectedShardIds = new HashSet<>( + toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(requestLeaseKeys, equalTo(expectedShardIds)); + assertThat(extendedSequenceNumbers.size(), equalTo(1)); + + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); + + verify(shardDetector, never()).listShards(); + verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + /** * Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards() * should never be called. @@ -306,6 +453,26 @@ public class HierarchicalShardSyncerTest { } } + @Test(expected = KinesisClientLibIOException.class) + public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenForMultiStream() throws Exception { + final List shards = new ArrayList<>(constructShardListForGraphA()); + final SequenceNumberRange range = shards.get(0).sequenceNumberRange().toBuilder().endingSequenceNumber(null) + .build(); + final Shard shard = shards.get(3).toBuilder().sequenceNumberRange(range).build(); + shards.remove(3); + shards.add(3, shard); + + when(shardDetector.listShards()).thenReturn(shards); + setupMultiStream(); + try { + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, + INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE); + } finally { + verify(shardDetector).listShards(); + verify(dynamoDBLeaseRefresher, never()).listLeases(); + } + } + /** * Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored. */ @@ -354,6 +521,51 @@ public class HierarchicalShardSyncerTest { verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } + @Test + public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildrenMultiStream() throws Exception { + final List shards = new ArrayList<>(constructShardListForGraphA()); + final Shard shard = shards.get(5); + assertThat(shard.shardId(), equalTo("shardId-5")); + + shards.remove(5); + + // shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5 + // is not closed, those children should be ignored when syncing shards, no leases + // should be obtained for them, and we should obtain a lease on the still-open + // parent. + shards.add(5, + shard.toBuilder() + .sequenceNumberRange(shard.sequenceNumberRange().toBuilder().endingSequenceNumber(null).build()) + .build()); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + setupMultiStream(); + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, true, SCOPE); + + final List leases = leaseCaptor.getAllValues(); + final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set leaseSequenceNumbers = leases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + final Set expectedShardIds = new HashSet<>(toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-5", "shardId-8"))); + + assertThat(leaseKeys.size(), equalTo(expectedShardIds.size())); + assertThat(leaseKeys, equalTo(expectedShardIds)); + assertThat(leaseSequenceNumbers.size(), equalTo(1)); + + leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); + + verify(shardDetector).listShards(); + verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + @Test public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShard() throws Exception { testCheckAndCreateLeasesForNewShardsAndClosedShard(ExtendedSequenceNumber.TRIM_HORIZON, @@ -711,6 +923,11 @@ public class HierarchicalShardSyncerTest { return createLeasesFromShards(Collections.singletonList(shard), checkpoint, leaseOwner).get(0); } + private MultiStreamLease createMultiStreamLeaseFromShard(final Shard shard, final ExtendedSequenceNumber checkpoint, + final String leaseOwner) { + return createMultiStreamLeasesFromShards(Collections.singletonList(shard), checkpoint, leaseOwner).get(0); + } + private List createLeasesFromShards(final List shards, final ExtendedSequenceNumber checkpoint, final String leaseOwner) { return shards.stream().map(shard -> { @@ -726,6 +943,29 @@ public class HierarchicalShardSyncerTest { }).collect(Collectors.toList()); } + private List createMultiStreamLeasesFromShards(final List shards, final ExtendedSequenceNumber checkpoint, + final String leaseOwner) { + return shards.stream().map(shard -> { + final Set parentShardIds = new HashSet<>(); + if (StringUtils.isNotEmpty(shard.parentShardId())) { + parentShardIds.add(shard.parentShardId()); + } + if (StringUtils.isNotEmpty(shard.adjacentParentShardId())) { + parentShardIds.add(shard.adjacentParentShardId()); + } + final MultiStreamLease msLease = new MultiStreamLease(); + msLease.shardId(shard.shardId()); + msLease.leaseOwner(leaseOwner); + msLease.leaseCounter(0L); + msLease.concurrencyToken(UUID.randomUUID()); + msLease.lastCounterIncrementNanos(0L); + msLease.checkpoint(checkpoint); + msLease.parentShardIds(parentShardIds); + msLease.streamIdentifier(STREAM_IDENTIFIER); + return msLease; + }).collect(Collectors.toList()); + } + @Test public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception { final List shards = constructShardListForGraphA(); @@ -755,6 +995,35 @@ public class HierarchicalShardSyncerTest { verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); } + @Test + public void testCleanUpGarbageLeaseForNonExistentShardForMultiStream() throws Exception { + final List shards = constructShardListForGraphA(); + final String garbageShardId = "shardId-garbage-001"; + final Shard garbageShard = ShardObjectHelper.newShard(garbageShardId, null, null, + ShardObjectHelper.newSequenceNumberRange("101", null)); + final Lease garbageLease = createMultiStreamLeaseFromShard(garbageShard, new ExtendedSequenceNumber("99"), LEASE_OWNER); + final List leases = new ArrayList<>( + createMultiStreamLeasesFromShards(shards, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER)); + leases.add(garbageLease); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases); + doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); + setupMultiStream(); + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, + INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + + assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); + assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); + + verify(shardDetector, times(2)).listShards(); + verify(dynamoDBLeaseRefresher).listLeases(); + verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); + } + private void testCheckAndCreateLeasesForShardsIfMissing(InitialPositionInStreamExtended initialPosition) throws Exception { final String shardId0 = "shardId-0"; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 810f0159..23fb5dad 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -57,8 +57,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; -import javax.swing.*; - @RunWith(MockitoJUnitRunner.class) public class ConsumerStatesTest { private static final String STREAM_NAME = "TestStream"; @@ -115,7 +113,7 @@ public class ConsumerStatesTest { @Before public void setup() { - argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.fromStreamName(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher, + argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher, shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java index cd3e8af8..21228c75 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java @@ -31,7 +31,6 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.metrics.MetricsFactory; @@ -67,7 +66,8 @@ public class FanOutConfigTest { .streamName(TEST_STREAM_NAME); RetrievalFactory retrievalFactory = config.retrievalFactory(); ShardInfo shardInfo = mock(ShardInfo.class); - doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); +// doReturn(Optional.of(StreamIdentifier.singleStreamInstance(TEST_STREAM_NAME).serialize())).when(shardInfo).streamIdentifier(); + doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt(); retrievalFactory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); assertThat(retrievalFactory, not(nullValue())); verify(consumerRegistration).getOrCreateStreamConsumerArn(); @@ -93,7 +93,7 @@ public class FanOutConfigTest { .streamName(TEST_STREAM_NAME); RetrievalFactory factory = config.retrievalFactory(); ShardInfo shardInfo = mock(ShardInfo.class); - doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); + doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt(); factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); assertThat(factory, not(nullValue())); @@ -108,7 +108,7 @@ public class FanOutConfigTest { .streamName(TEST_STREAM_NAME); RetrievalFactory factory = config.retrievalFactory(); ShardInfo shardInfo = mock(ShardInfo.class); - doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); + doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt(); factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); assertThat(factory, not(nullValue())); TestingConfig testingConfig = (TestingConfig) config; @@ -122,7 +122,7 @@ public class FanOutConfigTest { .consumerName(TEST_CONSUMER_NAME).streamName(TEST_STREAM_NAME); RetrievalFactory factory = config.retrievalFactory(); ShardInfo shardInfo = mock(ShardInfo.class); - doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); + doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt(); factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); assertThat(factory, not(nullValue()));