diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml
index a91a9a6b..8c9d4f49 100644
--- a/amazon-kinesis-client-multilang/pom.xml
+++ b/amazon-kinesis-client-multilang/pom.xml
@@ -21,14 +21,14 @@
amazon-kinesis-client-pom
software.amazon.kinesis
- 2.4.3
+ 2.5.0-SNAPSHOT
4.0.0
amazon-kinesis-client-multilang
- 1.12.296
+ 1.12.356
@@ -122,6 +122,12 @@
1.3
test
+
+ software.amazon.awssdk
+ aws-cbor-protocol
+ ${awssdk.version}
+ test
+
diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index 512daef0..1bd351e1 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -22,7 +22,7 @@
software.amazon.kinesis
amazon-kinesis-client-pom
- 2.4.3
+ 2.5.0-SNAPSHOT
amazon-kinesis-client
@@ -57,7 +57,7 @@
software.amazon.awssdk
- kinesis
+ kinesis-private
${awssdk.version}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java
index 09d28495..d9c1ec6d 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java
@@ -17,12 +17,11 @@ package software.amazon.kinesis.common;
import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
-import org.apache.commons.lang3.StringUtils;
-
-import lombok.NonNull;
import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@@ -32,9 +31,9 @@ import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
+import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
-import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;
/**
@@ -115,8 +114,8 @@ public class ConfigsBuilder {
}
/**
- * Constructor to initialize ConfigsBuilder with StreamName
- * @param streamName
+ * Constructor to initialize ConfigsBuilder with StreamName (or StreamARN)
+ * @param streamNameOrARN
* @param applicationName
* @param kinesisClient
* @param dynamoDBClient
@@ -124,11 +123,11 @@ public class ConfigsBuilder {
* @param workerIdentifier
* @param shardRecordProcessorFactory
*/
- public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
+ public ConfigsBuilder(@NonNull String streamNameOrARN, @NonNull String applicationName,
@NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
@NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
@NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
- this.appStreamTracker = Either.right(streamName);
+ this.appStreamTracker = Either.right(streamNameOrARN);
this.applicationName = applicationName;
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
@@ -224,7 +223,7 @@ public class ConfigsBuilder {
final RetrievalConfig retrievalConfig =
appStreamTracker.map(
multiStreamTracker -> new RetrievalConfig(kinesisClient(), multiStreamTracker, applicationName()),
- streamName -> new RetrievalConfig(kinesisClient(), streamName, applicationName()));
+ streamNameOrARN -> new RetrievalConfig(kinesisClient(), streamNameOrARN, applicationName()));
return retrievalConfig;
}
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java
new file mode 100644
index 00000000..cb864cee
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java
@@ -0,0 +1,82 @@
+package software.amazon.kinesis.common;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Slf4j
+public class StreamARNUtil {
+ public static Pattern STREAM_ARN_PATTERN = Pattern.compile(
+ "^arn:aws.*:kinesis:.*:\\d{12}:stream\\/(\\S+)$");
+ public static Pattern CONSUMER_ARN_PATTERN = Pattern.compile(
+ "^(arn:aws.*:kinesis:.*:\\d{12}:.*stream\\/[a-zA-Z0-9_.-]+)\\/consumer\\/[a-zA-Z0-9_.-]+:[0-9]+");
+
+ public static String getStreamName(String streamNameOrARN) {
+ final Matcher matcher = STREAM_ARN_PATTERN.matcher(streamNameOrARN);
+ if (matcher.find()) {
+ return matcher.group(1);
+ } else {
+ // Assume that the user entered the stream name because the argument doesn't match the StreamARN regex
+ return streamNameOrARN;
+ }
+ }
+
+ public static Optional getOptionalStreamARN(String streamNameOrARN) {
+ final Matcher matcher = STREAM_ARN_PATTERN.matcher(streamNameOrARN);
+ if (matcher.find()) {
+ return Optional.of(streamNameOrARN);
+ } else {
+ // Retrieve
+ return Optional.empty();
+ }
+ }
+
+ public static Optional getOptionalStreamARNFromDescribeStreamSummary(
+ String streamName, KinesisAsyncClient kinesis) {
+ final DescribeStreamSummaryRequest request =
+ KinesisRequestsBuilder.describeStreamSummaryRequestBuilder().streamName(streamName).build();
+ try {
+ DescribeStreamSummaryResponse response = kinesis.describeStreamSummary(request).get();
+ return Optional.ofNullable(response.streamDescriptionSummary().streamARN());
+ } catch (ExecutionException | InterruptedException e) {
+ log.warn("Not able to get StreamARN from the DescribeStreamSummary call", e);
+ }
+ return Optional.empty();
+ }
+
+ public static Optional getOptionalStreamARNFromConsumerARN(String consumerARN) {
+ if (StringUtils.isEmpty(consumerARN)) {
+ return Optional.empty();
+ }
+
+ final Matcher matcher = CONSUMER_ARN_PATTERN.matcher(consumerARN);
+ if (matcher.find()) {
+ return Optional.ofNullable(matcher.group(1));
+ } else {
+ log.warn("Can't extract the streamARN from consumerARN \"" + consumerARN +
+ "\", because it doesn't match the regex \"" + CONSUMER_ARN_PATTERN.pattern() + "\"");
+ return Optional.empty();
+ }
+ }
+
+ public static void trySetEmptyStreamARN(StreamIdentifier streamIdentifier, KinesisAsyncClient kinesisAsyncClient) {
+ Optional optionalStreamARN = streamIdentifier.streamARN();
+ if (!optionalStreamARN.isPresent()) {
+ streamIdentifier.streamARN(
+ getOptionalStreamARNFromDescribeStreamSummary(streamIdentifier.streamName(), kinesisAsyncClient));
+
+ log.debug(streamIdentifier.streamARN().isPresent() ?
+ "Successfully set streamARN to " + streamIdentifier.streamARN().get() :
+ "Not able to set streamARN via DescribeStreamSummary call");
+ } else {
+ log.debug("StreamARN " + optionalStreamARN.get() + " is passed during initialization.");
+ }
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java
index 1259a609..06d97145 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java
@@ -19,30 +19,44 @@ import com.google.common.base.Joiner;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
+import lombok.Setter;
import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Validate;
import java.util.Optional;
import java.util.regex.Pattern;
-@EqualsAndHashCode @Getter @Accessors(fluent = true)
+import static software.amazon.kinesis.common.StreamARNUtil.getOptionalStreamARN;
+import static software.amazon.kinesis.common.StreamARNUtil.getOptionalStreamARNFromDescribeStreamSummary;
+import static software.amazon.kinesis.common.StreamARNUtil.getStreamName;
+
+@Slf4j
+@EqualsAndHashCode
+@Getter
+@Accessors(fluent = true)
public class StreamIdentifier {
private final Optional accountIdOptional;
private final String streamName;
+ @Setter
+ private Optional streamARN;
private final Optional streamCreationEpochOptional;
private static final String DELIMITER = ":";
private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*");
- private StreamIdentifier(@NonNull String accountId, @NonNull String streamName, @NonNull Long streamCreationEpoch) {
+ private StreamIdentifier(@NonNull String accountId, @NonNull String streamNameOrARN, @NonNull Long streamCreationEpoch) {
this.accountIdOptional = Optional.of(accountId);
- this.streamName = streamName;
+ this.streamName = getStreamName(streamNameOrARN);
+ this.streamARN = getOptionalStreamARN(streamNameOrARN);
this.streamCreationEpochOptional = Optional.of(streamCreationEpoch);
}
- private StreamIdentifier(@NonNull String streamName) {
+ private StreamIdentifier(@NonNull String streamNameOrARN) {
this.accountIdOptional = Optional.empty();
- this.streamName = streamName;
+ this.streamName = getStreamName(streamNameOrARN);
+ this.streamARN = getOptionalStreamARN(streamNameOrARN);
this.streamCreationEpochOptional = Optional.empty();
}
@@ -76,6 +90,17 @@ public class StreamIdentifier {
}
}
+ /**
+ * Create a multi stream instance for StreamIdentifier from accountId, streamNameOrARN and creationEpoch
+ * The serialized stream identifier should be of the format account:stream:creationepoch
+ * @param accountId The account's 12-digit ID, which is hosting the stream
+ * @param streamNameOrARN Although streamName and streamARN can both be supplied, streamARN is preferred
+ * @return creationEpoch The stream's creation time stamp
+ */
+ public static StreamIdentifier multiStreamInstance(String accountId, String streamNameOrARN, Long creationEpoch) {
+ return new StreamIdentifier(accountId, streamNameOrARN, creationEpoch);
+ }
+
/**
* Create a single stream instance for StreamIdentifier from stream name.
* @param streamName
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
index 189ba18b..a46d29c3 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
@@ -198,7 +198,7 @@ public class KinesisShardDetector implements ShardDetector {
} else {
builder = builder.nextToken(nextToken);
}
-
+ streamIdentifier.streamARN().ifPresent(builder::streamARN);
final ListShardsRequest request = builder.build();
log.info("Stream {}: listing shards with list shards request {}", streamIdentifier, request);
@@ -275,14 +275,14 @@ public class KinesisShardDetector implements ShardDetector {
@Override
public List getChildShards(final String shardId) throws InterruptedException, ExecutionException, TimeoutException {
- final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
+ final GetShardIteratorRequest.Builder requestBuilder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName())
.shardIteratorType(ShardIteratorType.LATEST)
- .shardId(shardId)
- .build();
+ .shardId(shardId);
+ streamIdentifier.streamARN().ifPresent(requestBuilder::streamARN);
final GetShardIteratorResponse getShardIteratorResponse =
- FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), kinesisRequestTimeout);
+ FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(requestBuilder.build()), kinesisRequestTimeout);
final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(getShardIteratorResponse.shardIterator())
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
index abbee4da..97e59b88 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
@@ -32,6 +32,8 @@ import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
+import static software.amazon.kinesis.common.StreamARNUtil.trySetEmptyStreamARN;
+
/**
* Used by the KCL to configure the retrieval of records from Kinesis.
*/
@@ -102,18 +104,17 @@ public class RetrievalConfig {
private RetrievalFactory retrievalFactory;
- public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName,
+ public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamNameOrARN,
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
- this.appStreamTracker = Either
- .right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended));
+ this.appStreamTracker = Either.right(getInitialStreamConfig(streamNameOrARN, kinesisAsyncClient));
this.applicationName = applicationName;
}
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker,
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
- this.appStreamTracker = Either.left(multiStreamTracker);
+ this.appStreamTracker = Either.left(getInitialMultiStreamTracker(multiStreamTracker, kinesisAsyncClient));
this.applicationName = applicationName;
}
@@ -157,6 +158,7 @@ public class RetrievalConfig {
streamConfig -> streamConfig.streamIdentifier() == null
|| streamConfig.streamIdentifier().streamName() == null);
if(isInvalidFanoutConfig) {
+ // TODO: fix the error message
throw new IllegalArgumentException(
"Invalid config: Either in multi-stream mode with streamName/consumerArn configured or in single-stream mode with no streamName configured");
}
@@ -171,8 +173,22 @@ public class RetrievalConfig {
streamConfig.streamIdentifier() == null || streamConfig.streamIdentifier().streamName() == null);
if (isInvalidPollingConfig) {
+ // TODO: fix the error message
throw new IllegalArgumentException(
"Invalid config: Either in multi-stream mode with streamName configured or in single-stream mode with no streamName configured");
}
}
+
+ private StreamConfig getInitialStreamConfig(String streamNameOrARN, KinesisAsyncClient kinesisAsyncClient) {
+ StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(streamNameOrARN);
+ trySetEmptyStreamARN(streamIdentifier, kinesisAsyncClient);
+ return new StreamConfig(streamIdentifier, initialPositionInStreamExtended);
+ }
+
+ private MultiStreamTracker getInitialMultiStreamTracker(MultiStreamTracker multiStreamTracker, KinesisAsyncClient kinesisAsyncClient) {
+ multiStreamTracker.streamConfigList()
+ .forEach(streamConfig -> trySetEmptyStreamARN(streamConfig.streamIdentifier(), kinesisAsyncClient));
+ return multiStreamTracker;
+ }
+
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java
index 7e8932cf..8d79de00 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java
@@ -27,14 +27,12 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
-import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@@ -54,6 +52,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -61,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
+import static software.amazon.kinesis.common.StreamARNUtil.getOptionalStreamARNFromConsumerARN;
import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult;
@Slf4j
@@ -75,6 +75,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final KinesisAsyncClient kinesis;
private final String shardId;
private final String consumerArn;
+ private final Optional streamARN;
private final String streamAndShardId;
private final Object lockObject = new Object();
@@ -97,6 +98,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
this.kinesis = kinesis;
this.shardId = shardId;
this.consumerArn = consumerArn;
+ this.streamARN = getOptionalStreamARNFromConsumerARN(consumerArn);
this.streamAndShardId = shardId;
}
@@ -104,6 +106,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
this.kinesis = kinesis;
this.shardId = shardId;
this.consumerArn = consumerArn;
+ this.streamARN = getOptionalStreamARNFromConsumerARN(consumerArn);
this.streamAndShardId = streamIdentifierSer + ":" + shardId;
}
@@ -292,6 +295,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
resetRecordsDeliveryStateOnSubscriptionOnInit();
SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder()
.shardId(shardId).consumerARN(consumerArn);
+ streamARN.ifPresent(builder::streamARN);
+
SubscribeToShardRequest request;
if (isFirstConnection) {
request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStreamExtended).build();
@@ -304,8 +309,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
int subscribeInvocationId = subscribeToShardId.incrementAndGet();
String instanceId = shardId + "-" + subscribeInvocationId;
log.debug(
- "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard",
- streamAndShardId, connectionStart, instanceId);
+ "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard with request {}",
+ streamAndShardId, connectionStart, instanceId, request);
flow = new RecordFlow(this, connectionStart, instanceId);
kinesis.subscribeToShard(request, flow);
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java
index 223ab367..bc72de8e 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java
@@ -231,8 +231,11 @@ public class KinesisDataFetcher implements DataFetcher {
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId);
+ streamIdentifier.streamARN().ifPresent(builder::streamARN);
+
GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream)
.build();
+ log.debug("GetShardIterator request has parameters: " + request);
// TODO: Check if this metric is fine to be added
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
@@ -302,9 +305,10 @@ public class KinesisDataFetcher implements DataFetcher {
}
@Override
- public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
- return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator)
- .limit(maxRecords).build();
+ public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
+ GetRecordsRequest.Builder builder = KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator);
+ streamIdentifier.streamARN().ifPresent(builder::streamARN);
+ return builder.build();
}
@Override
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
index aa9f8412..cfbc9af9 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
@@ -47,6 +47,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -67,6 +68,10 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
+import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory;
@@ -170,6 +175,10 @@ public class SchedulerTest {
shardSyncTaskManagerMap = new HashMap<>();
shardDetectorMap = new HashMap<>();
shardRecordProcessorFactory = new TestShardRecordProcessorFactory();
+ when(kinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class)))
+ .thenReturn(CompletableFuture.completedFuture(DescribeStreamSummaryResponse.builder()
+ .streamDescriptionSummary(StreamDescriptionSummary.builder().streamARN("testStreamARN").build())
+ .build()));
checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory());
coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java
index 4fee3d08..1972ce2e 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java
@@ -43,7 +43,7 @@ import java.util.Optional;
@RunWith(MockitoJUnitRunner.class)
public class FanOutConfigTest {
- private static final String TEST_CONSUMER_ARN = "TestConsumerArn";
+ private static final String TEST_CONSUMER_ARN = "arn:aws:kinesis:us-east-2:123456789012:stream/mystream/consumer/TestConsumerArn:000";
private static final String TEST_APPLICATION_NAME = "TestApplication";
private static final String TEST_STREAM_NAME = "TestStream";
private static final String TEST_CONSUMER_NAME = "TestConsumerName";
@@ -75,7 +75,6 @@ public class FanOutConfigTest {
.streamName(TEST_STREAM_NAME);
RetrievalFactory retrievalFactory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class);
-// doReturn(Optional.of(StreamIdentifier.singleStreamInstance(TEST_STREAM_NAME).serialize())).when(shardInfo).streamIdentifier();
doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
retrievalFactory.createGetRecordsCache(shardInfo, streamConfig, mock(MetricsFactory.class));
assertThat(retrievalFactory, not(nullValue()));
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
index 40d86c49..da5db9a6 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
@@ -91,7 +91,7 @@ import static org.mockito.Mockito.verify;
public class FanOutRecordsPublisherTest {
private static final String SHARD_ID = "Shard-001";
- private static final String CONSUMER_ARN = "arn:consumer";
+ private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-2:123456789012:stream/mystream/consumer/TestConsumerArn:000";
private static final String CONTINUATION_SEQUENCE_NUMBER = "continuationSequenceNumber";
@Mock
diff --git a/pom.xml b/pom.xml
index 0e08d6c6..3c858d0a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
amazon-kinesis-client-pom
pom
Amazon Kinesis Client Library
- 2.4.3
+ 2.5.0-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
@@ -33,7 +33,7 @@
- 2.17.268
+ 2.18.29