From f94b1b801aca637387a1e22ac5b679d3cd897a5b Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 4 Mar 2020 02:52:20 -0800 Subject: [PATCH] Adding account and stream epoch support. Checkpoint 2 --- .../kinesis/common/StreamIdentifier.java | 2 ++ .../leases/HierarchicalShardSyncer.java | 23 +++++++++--------- .../kinesis/leases/KinesisShardDetector.java | 15 ++++++------ .../kinesis/leases/MultiStreamLease.java | 6 ++--- .../amazon/kinesis/leases/ShardDetector.java | 3 ++- .../DynamoDBLeaseManagementFactory.java | 13 +++++++--- .../DynamoDBMultiStreamLeaseSerializer.java | 9 +++---- .../fanout/FanOutRetrievalFactory.java | 14 ++++++++--- .../retrieval/polling/KinesisDataFetcher.java | 12 ++++++---- .../SynchronousBlockingRetrievalFactory.java | 6 ++++- ...ynchronousPrefetchingRetrievalFactory.java | 6 ++++- .../kinesis/coordinator/SchedulerTest.java | 4 +++- .../kinesis/lifecycle/ConsumerStatesTest.java | 3 ++- .../retrieval/fanout/FanOutConfigTest.java | 24 +++++++++++++++---- 14 files changed, 94 insertions(+), 46 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java index e54c97e6..5b1f9977 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -3,11 +3,13 @@ package software.amazon.kinesis.common; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.experimental.Accessors; import software.amazon.awssdk.utils.Validate; @RequiredArgsConstructor @EqualsAndHashCode @Getter +@Accessors(fluent = true) public class StreamIdentifier { private final String accountName; private final String streamName; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 459d730d..1102574f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -41,6 +41,7 @@ import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -120,9 +121,9 @@ public class HierarchicalShardSyncer { assertAllParentShardsAreClosed(inconsistentShardIds); } final List currentLeases = isMultiStreamMode ? - getLeasesForStream(shardDetector.streamName(), leaseRefresher) : + getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) : leaseRefresher.listLeases(); - final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamName()); + final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier()); final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds, multiStreamArgs); log.debug("Num new leases to create: {}", newLeasesToCreate.size()); @@ -149,19 +150,19 @@ public class HierarchicalShardSyncer { /** Note: This method has package level access solely for testing purposes. * - * @param streamName We'll use this stream name to filter leases + * @param streamIdentifier We'll use this stream identifier to filter leases * @param leaseRefresher Used to fetch leases * @return Return list of leases (corresponding to shards) of the specified stream. * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException */ - static List getLeasesForStream(String streamName, + static List getLeasesForStream(StreamIdentifier streamIdentifier, LeaseRefresher leaseRefresher) throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamLeases = new ArrayList<>(); for (Lease lease : leaseRefresher.listLeases()) { - if (streamName.equals(((MultiStreamLease)lease).streamIdentifier())) { + if (streamIdentifier.toString().equals(((MultiStreamLease)lease).streamIdentifier())) { streamLeases.add(lease); } } @@ -378,7 +379,7 @@ public class HierarchicalShardSyncer { } else { log.debug("Need to create a lease for shardId {}", shardId); final Lease newLease = multiStreamArgs.isMultiStreamMode() ? - newKCLMultiStreamLease(shard, multiStreamArgs.streamName()) : + newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) : newKCLLease(shard); final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, @@ -502,7 +503,7 @@ public class HierarchicalShardSyncer { if (lease == null) { lease = multiStreamArgs.isMultiStreamMode() ? newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId), - multiStreamArgs.streamName()) : + multiStreamArgs.streamIdentifier()) : newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); shardIdToLeaseMapOfNewShards.put(parentShardId, lease); } @@ -758,9 +759,9 @@ public class HierarchicalShardSyncer { return newLease; } - private static Lease newKCLMultiStreamLease(final Shard shard, final String streamName) { + private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) { MultiStreamLease newLease = new MultiStreamLease(); - newLease.leaseKey(MultiStreamLease.getLeaseKey(streamName, shard.shardId())); + newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.toString(), shard.shardId())); List parentShardIds = new ArrayList<>(2); if (shard.parentShardId() != null) { parentShardIds.add(shard.parentShardId()); @@ -770,7 +771,7 @@ public class HierarchicalShardSyncer { } newLease.parentShardIds(parentShardIds); newLease.ownerSwitchesSinceCheckpoint(0L); - newLease.streamIdentifier(streamName); + newLease.streamIdentifier(streamIdentifier.toString()); newLease.shardId(shard.shardId()); return newLease; } @@ -858,7 +859,7 @@ public class HierarchicalShardSyncer { @Accessors(fluent = true) private static class MultiStreamArgs { private final Boolean isMultiStreamMode; - private final String streamName; + private final StreamIdentifier streamIdentifier; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index f009a0e7..53751375 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -47,6 +47,7 @@ import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.KinesisRequestsBuilder; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.retrieval.AWSExceptionManager; /** @@ -60,7 +61,7 @@ public class KinesisShardDetector implements ShardDetector { @NonNull private final KinesisAsyncClient kinesisClient; @NonNull @Getter - private final String streamName; + private final StreamIdentifier streamIdentifier; private final long listShardsBackoffTimeInMillis; private final int maxListShardsRetryAttempts; private final long listShardsCacheAllowedAgeInSeconds; @@ -77,16 +78,16 @@ public class KinesisShardDetector implements ShardDetector { public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, int cacheMissWarningModulus) { - this(kinesisClient, streamName, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, + this(kinesisClient, StreamIdentifier.fromStreamName(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); } - public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, + public KinesisShardDetector(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, int cacheMissWarningModulus, Duration kinesisRequestTimeout) { this.kinesisClient = kinesisClient; - this.streamName = streamName; + this.streamIdentifier = streamIdentifier; this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis; this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds; @@ -180,7 +181,7 @@ public class KinesisShardDetector implements ShardDetector { ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder(); if (StringUtils.isEmpty(nextToken)) { - request = request.streamName(streamName); + request = request.streamName(streamIdentifier.streamName()); } else { request = request.nextToken(nextToken); } @@ -205,12 +206,12 @@ public class KinesisShardDetector implements ShardDetector { + " Active or Updating)"); return null; } catch (LimitExceededException e) { - log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", streamName, + log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", streamIdentifier, listShardsBackoffTimeInMillis); try { Thread.sleep(listShardsBackoffTimeInMillis); } catch (InterruptedException ie) { - log.debug("Stream {} : Sleep was interrupted ", streamName, ie); + log.debug("Stream {} : Sleep was interrupted ", streamIdentifier, ie); } lastException = e; } catch (TimeoutException te) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java index 8b29168d..aa850f2f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java @@ -35,10 +35,10 @@ public class MultiStreamLease extends Lease { shardId(casted.shardId); } - public static String getLeaseKey(String streamName, String shardId) { - verifyNotNull(streamName, "streamName should not be null"); + public static String getLeaseKey(String streamIdentifier, String shardId) { + verifyNotNull(streamIdentifier, "streamIdentifier should not be null"); verifyNotNull(shardId, "shardId should not be null"); - return streamName + ":" + shardId; + return streamIdentifier + ":" + shardId; } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index 43e1e1b3..6ae012e6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.leases; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.kinesis.common.StreamIdentifier; import java.util.List; @@ -27,7 +28,7 @@ public interface ShardDetector { List listShards(); - default String streamName() { + default StreamIdentifier streamIdentifier() { throw new UnsupportedOperationException("StreamName not available"); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 124d28ca..e383a41b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.LeaseCoordinator; @@ -330,7 +331,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode) { - this(kinesisClient, new StreamConfig(streamName, initialPositionInStream), dynamoDBClient, tableName, + this(kinesisClient, new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStream), dynamoDBClient, tableName, workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, @@ -477,14 +478,20 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @Override @Deprecated public ShardDetector createShardDetector() { - return new KinesisShardDetector(kinesisClient, streamConfig.streamName(), + return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, dynamoDbRequestTimeout); } + /** + * KinesisShardDetector supports reading from service only using streamName. Support for accountId and + * stream creation epoch is yet to be provided. + * @param streamConfig + * @return + */ @Override public ShardDetector createShardDetector(StreamConfig streamConfig) { - return new KinesisShardDetector(kinesisClient, streamConfig.streamName(), listShardsBackoffTimeMillis, + return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, dynamoDbRequestTimeout); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java index b8637bdb..d703a970 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java @@ -14,14 +14,15 @@ import static software.amazon.kinesis.leases.MultiStreamLease.validateAndCast; @NoArgsConstructor public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer { - private static final String STREAM_NAME_KEY = "streamName"; + // Keeping the stream id as "streamName" for legacy reasons. + private static final String STREAM_ID_KEY = "streamName"; private static final String SHARD_ID_KEY = "shardId"; @Override public Map toDynamoRecord(Lease lease) { final MultiStreamLease multiStreamLease = validateAndCast(lease); final Map result = super.toDynamoRecord(multiStreamLease); - result.put(STREAM_NAME_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier())); + result.put(STREAM_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier())); result.put(SHARD_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.shardId())); return result; } @@ -30,7 +31,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer public MultiStreamLease fromDynamoRecord(Map dynamoRecord) { final MultiStreamLease multiStreamLease = (MultiStreamLease) super .fromDynamoRecord(dynamoRecord, new MultiStreamLease()); - multiStreamLease.streamIdentifier(DynamoUtils.safeGetString(dynamoRecord, STREAM_NAME_KEY)); + multiStreamLease.streamIdentifier(DynamoUtils.safeGetString(dynamoRecord, STREAM_ID_KEY)); multiStreamLease.shardId(DynamoUtils.safeGetString(dynamoRecord, SHARD_ID_KEY)); return multiStreamLease; } @@ -40,7 +41,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer public Map getDynamoUpdateLeaseUpdate(Lease lease) { final MultiStreamLease multiStreamLease = validateAndCast(lease); final Map result = super.getDynamoUpdateLeaseUpdate(multiStreamLease); - result.put(STREAM_NAME_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier()))); + result.put(STREAM_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier()))); result.put(SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.shardId()))); return result; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index d4654323..f655d8fd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -19,6 +19,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -27,6 +28,7 @@ import software.amazon.kinesis.retrieval.RetrievalFactory; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.function.Function; @RequiredArgsConstructor @@ -36,7 +38,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; private final String defaultStreamName; private final Function consumerArnProvider; - private Map streamToconsumerArnMap = new HashMap<>(); + private Map streamToConsumerArnMap = new HashMap<>(); @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, @@ -47,8 +49,14 @@ public class FanOutRetrievalFactory implements RetrievalFactory { @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, final MetricsFactory metricsFactory) { - final String streamName = shardInfo.streamIdentifier().orElse(defaultStreamName); + final Optional streamIdentifierStr = shardInfo.streamIdentifier(); + final String streamName; + if(streamIdentifierStr.isPresent()) { + streamName = StreamIdentifier.fromString(streamIdentifierStr.get()).streamName(); + } else { + streamName = defaultStreamName; + } return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - streamToconsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); + streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 1b769cb9..db6d2aae 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -40,6 +40,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; @@ -63,7 +64,7 @@ public class KinesisDataFetcher { @NonNull private final KinesisAsyncClient kinesisClient; @NonNull - private final String streamName; + private final StreamIdentifier streamIdentifier; @NonNull private final String shardId; private final int maxRecords; @@ -73,12 +74,13 @@ public class KinesisDataFetcher { @Deprecated public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) { - this(kinesisClient, streamName, shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT); + this(kinesisClient, StreamIdentifier.fromStreamName(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT); } - public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) { + // Changing the constructor directly as this is an internal API + public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) { this.kinesisClient = kinesisClient; - this.streamName = streamName; + this.streamIdentifier = streamIdentifier; this.shardId = shardId; this.maxRecords = maxRecords; this.metricsFactory = metricsFactory; @@ -199,7 +201,7 @@ public class KinesisDataFetcher { final AWSExceptionManager exceptionManager = createExceptionManager(); GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() - .streamName(streamName).shardId(shardId); + .streamName(streamIdentifier.streamName()).shardId(shardId); GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream) .build(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 4a0dc9d5..93718f7c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -19,6 +19,7 @@ import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -62,8 +63,11 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { + final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ? + StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) : + StreamIdentifier.fromStreamName(streamName); return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, shardInfo.streamIdentifier().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); + new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index 6e2e2b05..e73ad928 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -67,8 +68,11 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { + final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ? + StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) : + StreamIdentifier.fromStreamName(streamName); return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, shardInfo.streamIdentifier().orElse(streamName), shardInfo.shardId(), + new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, maxFutureWait)); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index a17ad4cb..7cfc86a5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -53,6 +53,7 @@ import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -501,9 +502,10 @@ public class SchedulerTest { } @Override - public ShardRecordProcessor shardRecordProcessor(String streamName) { + public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) { return shardRecordProcessor(); } + } private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 16f5e9a4..810f0159 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; @@ -114,7 +115,7 @@ public class ConsumerStatesTest { @Before public void setup() { - argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseCoordinator, executorService, recordsPublisher, + argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.fromStreamName(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher, shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java index a0d18d56..cd3e8af8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java @@ -19,6 +19,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -29,9 +31,14 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.RetrievalFactory; +import java.util.Optional; + @RunWith(MockitoJUnitRunner.class) public class FanOutConfigTest { @@ -59,7 +66,9 @@ public class FanOutConfigTest { FanOutConfig config = new TestingConfig(kinesisClient).applicationName(TEST_APPLICATION_NAME) .streamName(TEST_STREAM_NAME); RetrievalFactory retrievalFactory = config.retrievalFactory(); - + ShardInfo shardInfo = mock(ShardInfo.class); + doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); + retrievalFactory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); assertThat(retrievalFactory, not(nullValue())); verify(consumerRegistration).getOrCreateStreamConsumerArn(); } @@ -83,7 +92,9 @@ public class FanOutConfigTest { FanOutConfig config = new TestingConfig(kinesisClient).applicationName(TEST_APPLICATION_NAME) .streamName(TEST_STREAM_NAME); RetrievalFactory factory = config.retrievalFactory(); - + ShardInfo shardInfo = mock(ShardInfo.class); + doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); + factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); assertThat(factory, not(nullValue())); TestingConfig testingConfig = (TestingConfig) config; @@ -96,9 +107,10 @@ public class FanOutConfigTest { FanOutConfig config = new TestingConfig(kinesisClient).consumerName(TEST_CONSUMER_NAME) .streamName(TEST_STREAM_NAME); RetrievalFactory factory = config.retrievalFactory(); - + ShardInfo shardInfo = mock(ShardInfo.class); + doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); + factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); assertThat(factory, not(nullValue())); - TestingConfig testingConfig = (TestingConfig) config; assertThat(testingConfig.stream, equalTo(TEST_STREAM_NAME)); assertThat(testingConfig.consumerToCreate, equalTo(TEST_CONSUMER_NAME)); @@ -109,7 +121,9 @@ public class FanOutConfigTest { FanOutConfig config = new TestingConfig(kinesisClient).applicationName(TEST_APPLICATION_NAME) .consumerName(TEST_CONSUMER_NAME).streamName(TEST_STREAM_NAME); RetrievalFactory factory = config.retrievalFactory(); - + ShardInfo shardInfo = mock(ShardInfo.class); + doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); + factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); assertThat(factory, not(nullValue())); TestingConfig testingConfig = (TestingConfig) config;