From 6267e66c71d3cefdfe1ccefd54e602535b4de456 Mon Sep 17 00:00:00 2001 From: Yu Zeng Date: Mon, 5 Dec 2022 16:21:53 -0800 Subject: [PATCH] StreamARN API updates --- amazon-kinesis-client-multilang/pom.xml | 10 ++- amazon-kinesis-client/pom.xml | 4 +- .../amazon/kinesis/common/ConfigsBuilder.java | 17 ++-- .../amazon/kinesis/common/StreamARNUtil.java | 82 +++++++++++++++++++ .../kinesis/common/StreamIdentifier.java | 35 ++++++-- .../kinesis/leases/KinesisShardDetector.java | 10 +-- .../kinesis/retrieval/RetrievalConfig.java | 24 +++++- .../fanout/FanOutRecordsPublisher.java | 13 ++- .../retrieval/polling/KinesisDataFetcher.java | 10 ++- .../kinesis/coordinator/SchedulerTest.java | 9 ++ .../retrieval/fanout/FanOutConfigTest.java | 3 +- .../fanout/FanOutRecordsPublisherTest.java | 2 +- pom.xml | 4 +- 13 files changed, 184 insertions(+), 39 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml index a91a9a6b..8c9d4f49 100644 --- a/amazon-kinesis-client-multilang/pom.xml +++ b/amazon-kinesis-client-multilang/pom.xml @@ -21,14 +21,14 @@ amazon-kinesis-client-pom software.amazon.kinesis - 2.4.3 + 2.5.0-SNAPSHOT 4.0.0 amazon-kinesis-client-multilang - 1.12.296 + 1.12.356 @@ -122,6 +122,12 @@ 1.3 test + + software.amazon.awssdk + aws-cbor-protocol + ${awssdk.version} + test + diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 512daef0..1bd351e1 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -22,7 +22,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 2.4.3 + 2.5.0-SNAPSHOT amazon-kinesis-client @@ -57,7 +57,7 @@ software.amazon.awssdk - kinesis + kinesis-private ${awssdk.version} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java index 09d28495..d9c1ec6d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java @@ -17,12 +17,11 @@ package software.amazon.kinesis.common; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; import lombok.ToString; -import org.apache.commons.lang3.StringUtils; - -import lombok.NonNull; import lombok.experimental.Accessors; +import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -32,9 +31,9 @@ import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.metrics.MetricsConfig; +import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; -import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.retrieval.RetrievalConfig; /** @@ -115,8 +114,8 @@ public class ConfigsBuilder { } /** - * Constructor to initialize ConfigsBuilder with StreamName - * @param streamName + * Constructor to initialize ConfigsBuilder with StreamName (or StreamARN) + * @param streamNameOrARN * @param applicationName * @param kinesisClient * @param dynamoDBClient @@ -124,11 +123,11 @@ public class ConfigsBuilder { * @param workerIdentifier * @param shardRecordProcessorFactory */ - public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, + public ConfigsBuilder(@NonNull String streamNameOrARN, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { - this.appStreamTracker = Either.right(streamName); + this.appStreamTracker = Either.right(streamNameOrARN); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; @@ -224,7 +223,7 @@ public class ConfigsBuilder { final RetrievalConfig retrievalConfig = appStreamTracker.map( multiStreamTracker -> new RetrievalConfig(kinesisClient(), multiStreamTracker, applicationName()), - streamName -> new RetrievalConfig(kinesisClient(), streamName, applicationName())); + streamNameOrARN -> new RetrievalConfig(kinesisClient(), streamNameOrARN, applicationName())); return retrievalConfig; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java new file mode 100644 index 00000000..cb864cee --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java @@ -0,0 +1,82 @@ +package software.amazon.kinesis.common; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Slf4j +public class StreamARNUtil { + public static Pattern STREAM_ARN_PATTERN = Pattern.compile( + "^arn:aws.*:kinesis:.*:\\d{12}:stream\\/(\\S+)$"); + public static Pattern CONSUMER_ARN_PATTERN = Pattern.compile( + "^(arn:aws.*:kinesis:.*:\\d{12}:.*stream\\/[a-zA-Z0-9_.-]+)\\/consumer\\/[a-zA-Z0-9_.-]+:[0-9]+"); + + public static String getStreamName(String streamNameOrARN) { + final Matcher matcher = STREAM_ARN_PATTERN.matcher(streamNameOrARN); + if (matcher.find()) { + return matcher.group(1); + } else { + // Assume that the user entered the stream name because the argument doesn't match the StreamARN regex + return streamNameOrARN; + } + } + + public static Optional getOptionalStreamARN(String streamNameOrARN) { + final Matcher matcher = STREAM_ARN_PATTERN.matcher(streamNameOrARN); + if (matcher.find()) { + return Optional.of(streamNameOrARN); + } else { + // Retrieve + return Optional.empty(); + } + } + + public static Optional getOptionalStreamARNFromDescribeStreamSummary( + String streamName, KinesisAsyncClient kinesis) { + final DescribeStreamSummaryRequest request = + KinesisRequestsBuilder.describeStreamSummaryRequestBuilder().streamName(streamName).build(); + try { + DescribeStreamSummaryResponse response = kinesis.describeStreamSummary(request).get(); + return Optional.ofNullable(response.streamDescriptionSummary().streamARN()); + } catch (ExecutionException | InterruptedException e) { + log.warn("Not able to get StreamARN from the DescribeStreamSummary call", e); + } + return Optional.empty(); + } + + public static Optional getOptionalStreamARNFromConsumerARN(String consumerARN) { + if (StringUtils.isEmpty(consumerARN)) { + return Optional.empty(); + } + + final Matcher matcher = CONSUMER_ARN_PATTERN.matcher(consumerARN); + if (matcher.find()) { + return Optional.ofNullable(matcher.group(1)); + } else { + log.warn("Can't extract the streamARN from consumerARN \"" + consumerARN + + "\", because it doesn't match the regex \"" + CONSUMER_ARN_PATTERN.pattern() + "\""); + return Optional.empty(); + } + } + + public static void trySetEmptyStreamARN(StreamIdentifier streamIdentifier, KinesisAsyncClient kinesisAsyncClient) { + Optional optionalStreamARN = streamIdentifier.streamARN(); + if (!optionalStreamARN.isPresent()) { + streamIdentifier.streamARN( + getOptionalStreamARNFromDescribeStreamSummary(streamIdentifier.streamName(), kinesisAsyncClient)); + + log.debug(streamIdentifier.streamARN().isPresent() ? + "Successfully set streamARN to " + streamIdentifier.streamARN().get() : + "Not able to set streamARN via DescribeStreamSummary call"); + } else { + log.debug("StreamARN " + optionalStreamARN.get() + " is passed during initialization."); + } + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java index 1259a609..06d97145 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -19,30 +19,44 @@ import com.google.common.base.Joiner; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.utils.Validate; import java.util.Optional; import java.util.regex.Pattern; -@EqualsAndHashCode @Getter @Accessors(fluent = true) +import static software.amazon.kinesis.common.StreamARNUtil.getOptionalStreamARN; +import static software.amazon.kinesis.common.StreamARNUtil.getOptionalStreamARNFromDescribeStreamSummary; +import static software.amazon.kinesis.common.StreamARNUtil.getStreamName; + +@Slf4j +@EqualsAndHashCode +@Getter +@Accessors(fluent = true) public class StreamIdentifier { private final Optional accountIdOptional; private final String streamName; + @Setter + private Optional streamARN; private final Optional streamCreationEpochOptional; private static final String DELIMITER = ":"; private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*"); - private StreamIdentifier(@NonNull String accountId, @NonNull String streamName, @NonNull Long streamCreationEpoch) { + private StreamIdentifier(@NonNull String accountId, @NonNull String streamNameOrARN, @NonNull Long streamCreationEpoch) { this.accountIdOptional = Optional.of(accountId); - this.streamName = streamName; + this.streamName = getStreamName(streamNameOrARN); + this.streamARN = getOptionalStreamARN(streamNameOrARN); this.streamCreationEpochOptional = Optional.of(streamCreationEpoch); } - private StreamIdentifier(@NonNull String streamName) { + private StreamIdentifier(@NonNull String streamNameOrARN) { this.accountIdOptional = Optional.empty(); - this.streamName = streamName; + this.streamName = getStreamName(streamNameOrARN); + this.streamARN = getOptionalStreamARN(streamNameOrARN); this.streamCreationEpochOptional = Optional.empty(); } @@ -76,6 +90,17 @@ public class StreamIdentifier { } } + /** + * Create a multi stream instance for StreamIdentifier from accountId, streamNameOrARN and creationEpoch + * The serialized stream identifier should be of the format account:stream:creationepoch + * @param accountId The account's 12-digit ID, which is hosting the stream + * @param streamNameOrARN Although streamName and streamARN can both be supplied, streamARN is preferred + * @return creationEpoch The stream's creation time stamp + */ + public static StreamIdentifier multiStreamInstance(String accountId, String streamNameOrARN, Long creationEpoch) { + return new StreamIdentifier(accountId, streamNameOrARN, creationEpoch); + } + /** * Create a single stream instance for StreamIdentifier from stream name. * @param streamName diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index 189ba18b..a46d29c3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -198,7 +198,7 @@ public class KinesisShardDetector implements ShardDetector { } else { builder = builder.nextToken(nextToken); } - + streamIdentifier.streamARN().ifPresent(builder::streamARN); final ListShardsRequest request = builder.build(); log.info("Stream {}: listing shards with list shards request {}", streamIdentifier, request); @@ -275,14 +275,14 @@ public class KinesisShardDetector implements ShardDetector { @Override public List getChildShards(final String shardId) throws InterruptedException, ExecutionException, TimeoutException { - final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder() + final GetShardIteratorRequest.Builder requestBuilder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() .streamName(streamIdentifier.streamName()) .shardIteratorType(ShardIteratorType.LATEST) - .shardId(shardId) - .build(); + .shardId(shardId); + streamIdentifier.streamARN().ifPresent(requestBuilder::streamARN); final GetShardIteratorResponse getShardIteratorResponse = - FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), kinesisRequestTimeout); + FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(requestBuilder.build()), kinesisRequestTimeout); final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder() .shardIterator(getShardIteratorResponse.shardIterator()) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index abbee4da..97e59b88 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -32,6 +32,8 @@ import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.retrieval.fanout.FanOutConfig; import software.amazon.kinesis.retrieval.polling.PollingConfig; +import static software.amazon.kinesis.common.StreamARNUtil.trySetEmptyStreamARN; + /** * Used by the KCL to configure the retrieval of records from Kinesis. */ @@ -102,18 +104,17 @@ public class RetrievalConfig { private RetrievalFactory retrievalFactory; - public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName, + public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamNameOrARN, @NonNull String applicationName) { this.kinesisClient = kinesisAsyncClient; - this.appStreamTracker = Either - .right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended)); + this.appStreamTracker = Either.right(getInitialStreamConfig(streamNameOrARN, kinesisAsyncClient)); this.applicationName = applicationName; } public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName) { this.kinesisClient = kinesisAsyncClient; - this.appStreamTracker = Either.left(multiStreamTracker); + this.appStreamTracker = Either.left(getInitialMultiStreamTracker(multiStreamTracker, kinesisAsyncClient)); this.applicationName = applicationName; } @@ -157,6 +158,7 @@ public class RetrievalConfig { streamConfig -> streamConfig.streamIdentifier() == null || streamConfig.streamIdentifier().streamName() == null); if(isInvalidFanoutConfig) { + // TODO: fix the error message throw new IllegalArgumentException( "Invalid config: Either in multi-stream mode with streamName/consumerArn configured or in single-stream mode with no streamName configured"); } @@ -171,8 +173,22 @@ public class RetrievalConfig { streamConfig.streamIdentifier() == null || streamConfig.streamIdentifier().streamName() == null); if (isInvalidPollingConfig) { + // TODO: fix the error message throw new IllegalArgumentException( "Invalid config: Either in multi-stream mode with streamName configured or in single-stream mode with no streamName configured"); } } + + private StreamConfig getInitialStreamConfig(String streamNameOrARN, KinesisAsyncClient kinesisAsyncClient) { + StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(streamNameOrARN); + trySetEmptyStreamARN(streamIdentifier, kinesisAsyncClient); + return new StreamConfig(streamIdentifier, initialPositionInStreamExtended); + } + + private MultiStreamTracker getInitialMultiStreamTracker(MultiStreamTracker multiStreamTracker, KinesisAsyncClient kinesisAsyncClient) { + multiStreamTracker.streamConfigList() + .forEach(streamConfig -> trySetEmptyStreamARN(streamConfig.streamIdentifier(), kinesisAsyncClient)); + return multiStreamTracker; + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 7e8932cf..8d79de00 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -27,14 +27,12 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; -import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.Either; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -54,6 +52,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -61,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; +import static software.amazon.kinesis.common.StreamARNUtil.getOptionalStreamARNFromConsumerARN; import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult; @Slf4j @@ -75,6 +75,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final KinesisAsyncClient kinesis; private final String shardId; private final String consumerArn; + private final Optional streamARN; private final String streamAndShardId; private final Object lockObject = new Object(); @@ -97,6 +98,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { this.kinesis = kinesis; this.shardId = shardId; this.consumerArn = consumerArn; + this.streamARN = getOptionalStreamARNFromConsumerARN(consumerArn); this.streamAndShardId = shardId; } @@ -104,6 +106,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { this.kinesis = kinesis; this.shardId = shardId; this.consumerArn = consumerArn; + this.streamARN = getOptionalStreamARNFromConsumerARN(consumerArn); this.streamAndShardId = streamIdentifierSer + ":" + shardId; } @@ -292,6 +295,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { resetRecordsDeliveryStateOnSubscriptionOnInit(); SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() .shardId(shardId).consumerARN(consumerArn); + streamARN.ifPresent(builder::streamARN); + SubscribeToShardRequest request; if (isFirstConnection) { request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStreamExtended).build(); @@ -304,8 +309,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { int subscribeInvocationId = subscribeToShardId.incrementAndGet(); String instanceId = shardId + "-" + subscribeInvocationId; log.debug( - "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard", - streamAndShardId, connectionStart, instanceId); + "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard with request {}", + streamAndShardId, connectionStart, instanceId, request); flow = new RecordFlow(this, connectionStart, instanceId); kinesis.subscribeToShard(request, flow); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 223ab367..bc72de8e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -231,8 +231,11 @@ public class KinesisDataFetcher implements DataFetcher { GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() .streamName(streamIdentifier.streamName()).shardId(shardId); + streamIdentifier.streamARN().ifPresent(builder::streamARN); + GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream) .build(); + log.debug("GetShardIterator request has parameters: " + request); // TODO: Check if this metric is fine to be added final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION); @@ -302,9 +305,10 @@ public class KinesisDataFetcher implements DataFetcher { } @Override - public GetRecordsRequest getGetRecordsRequest(String nextIterator) { - return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) - .limit(maxRecords).build(); + public GetRecordsRequest getGetRecordsRequest(String nextIterator) { + GetRecordsRequest.Builder builder = KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator); + streamIdentifier.streamARN().ifPresent(builder::streamARN); + return builder.build(); } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index aa9f8412..cfbc9af9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -47,6 +47,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -67,6 +68,10 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; +import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; @@ -170,6 +175,10 @@ public class SchedulerTest { shardSyncTaskManagerMap = new HashMap<>(); shardDetectorMap = new HashMap<>(); shardRecordProcessorFactory = new TestShardRecordProcessorFactory(); + when(kinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeStreamSummaryResponse.builder() + .streamDescriptionSummary(StreamDescriptionSummary.builder().streamARN("testStreamARN").build()) + .build())); checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory()); coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java index 4fee3d08..1972ce2e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java @@ -43,7 +43,7 @@ import java.util.Optional; @RunWith(MockitoJUnitRunner.class) public class FanOutConfigTest { - private static final String TEST_CONSUMER_ARN = "TestConsumerArn"; + private static final String TEST_CONSUMER_ARN = "arn:aws:kinesis:us-east-2:123456789012:stream/mystream/consumer/TestConsumerArn:000"; private static final String TEST_APPLICATION_NAME = "TestApplication"; private static final String TEST_STREAM_NAME = "TestStream"; private static final String TEST_CONSUMER_NAME = "TestConsumerName"; @@ -75,7 +75,6 @@ public class FanOutConfigTest { .streamName(TEST_STREAM_NAME); RetrievalFactory retrievalFactory = config.retrievalFactory(); ShardInfo shardInfo = mock(ShardInfo.class); -// doReturn(Optional.of(StreamIdentifier.singleStreamInstance(TEST_STREAM_NAME).serialize())).when(shardInfo).streamIdentifier(); doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt(); retrievalFactory.createGetRecordsCache(shardInfo, streamConfig, mock(MetricsFactory.class)); assertThat(retrievalFactory, not(nullValue())); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 40d86c49..da5db9a6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -91,7 +91,7 @@ import static org.mockito.Mockito.verify; public class FanOutRecordsPublisherTest { private static final String SHARD_ID = "Shard-001"; - private static final String CONSUMER_ARN = "arn:consumer"; + private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-2:123456789012:stream/mystream/consumer/TestConsumerArn:000"; private static final String CONTINUATION_SEQUENCE_NUMBER = "continuationSequenceNumber"; @Mock diff --git a/pom.xml b/pom.xml index 0e08d6c6..3c858d0a 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 2.4.3 + 2.5.0-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. @@ -33,7 +33,7 @@ - 2.17.268 + 2.18.29