StreamARN API updates

This commit is contained in:
Yu Zeng 2022-12-05 16:21:53 -08:00
parent 05ed537572
commit 6267e66c71
13 changed files with 184 additions and 39 deletions

View file

@ -21,14 +21,14 @@
<parent> <parent>
<artifactId>amazon-kinesis-client-pom</artifactId> <artifactId>amazon-kinesis-client-pom</artifactId>
<groupId>software.amazon.kinesis</groupId> <groupId>software.amazon.kinesis</groupId>
<version>2.4.3</version> <version>2.5.0-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>amazon-kinesis-client-multilang</artifactId> <artifactId>amazon-kinesis-client-multilang</artifactId>
<properties> <properties>
<aws-java-sdk.version>1.12.296</aws-java-sdk.version> <aws-java-sdk.version>1.12.356</aws-java-sdk.version>
</properties> </properties>
<dependencies> <dependencies>
@ -122,6 +122,12 @@
<version>1.3</version> <version>1.3</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-cbor-protocol</artifactId>
<version>${awssdk.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View file

@ -22,7 +22,7 @@
<parent> <parent>
<groupId>software.amazon.kinesis</groupId> <groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client-pom</artifactId> <artifactId>amazon-kinesis-client-pom</artifactId>
<version>2.4.3</version> <version>2.5.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
@ -57,7 +57,7 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>software.amazon.awssdk</groupId> <groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId> <artifactId>kinesis-private</artifactId>
<version>${awssdk.version}</version> <version>${awssdk.version}</version>
</dependency> </dependency>
<dependency> <dependency>

View file

@ -17,12 +17,11 @@ package software.amazon.kinesis.common;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.NonNull;
import lombok.Setter; import lombok.Setter;
import lombok.ToString; import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@ -32,9 +31,9 @@ import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
/** /**
@ -115,8 +114,8 @@ public class ConfigsBuilder {
} }
/** /**
* Constructor to initialize ConfigsBuilder with StreamName * Constructor to initialize ConfigsBuilder with StreamName (or StreamARN)
* @param streamName * @param streamNameOrARN
* @param applicationName * @param applicationName
* @param kinesisClient * @param kinesisClient
* @param dynamoDBClient * @param dynamoDBClient
@ -124,11 +123,11 @@ public class ConfigsBuilder {
* @param workerIdentifier * @param workerIdentifier
* @param shardRecordProcessorFactory * @param shardRecordProcessorFactory
*/ */
public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, public ConfigsBuilder(@NonNull String streamNameOrARN, @NonNull String applicationName,
@NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
@NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
@NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this.appStreamTracker = Either.right(streamName); this.appStreamTracker = Either.right(streamNameOrARN);
this.applicationName = applicationName; this.applicationName = applicationName;
this.kinesisClient = kinesisClient; this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient; this.dynamoDBClient = dynamoDBClient;
@ -224,7 +223,7 @@ public class ConfigsBuilder {
final RetrievalConfig retrievalConfig = final RetrievalConfig retrievalConfig =
appStreamTracker.map( appStreamTracker.map(
multiStreamTracker -> new RetrievalConfig(kinesisClient(), multiStreamTracker, applicationName()), multiStreamTracker -> new RetrievalConfig(kinesisClient(), multiStreamTracker, applicationName()),
streamName -> new RetrievalConfig(kinesisClient(), streamName, applicationName())); streamNameOrARN -> new RetrievalConfig(kinesisClient(), streamNameOrARN, applicationName()));
return retrievalConfig; return retrievalConfig;
} }
} }

View file

@ -0,0 +1,82 @@
package software.amazon.kinesis.common;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Slf4j
public class StreamARNUtil {
public static Pattern STREAM_ARN_PATTERN = Pattern.compile(
"^arn:aws.*:kinesis:.*:\\d{12}:stream\\/(\\S+)$");
public static Pattern CONSUMER_ARN_PATTERN = Pattern.compile(
"^(arn:aws.*:kinesis:.*:\\d{12}:.*stream\\/[a-zA-Z0-9_.-]+)\\/consumer\\/[a-zA-Z0-9_.-]+:[0-9]+");
public static String getStreamName(String streamNameOrARN) {
final Matcher matcher = STREAM_ARN_PATTERN.matcher(streamNameOrARN);
if (matcher.find()) {
return matcher.group(1);
} else {
// Assume that the user entered the stream name because the argument doesn't match the StreamARN regex
return streamNameOrARN;
}
}
public static Optional<String> getOptionalStreamARN(String streamNameOrARN) {
final Matcher matcher = STREAM_ARN_PATTERN.matcher(streamNameOrARN);
if (matcher.find()) {
return Optional.of(streamNameOrARN);
} else {
// Retrieve
return Optional.empty();
}
}
public static Optional<String> getOptionalStreamARNFromDescribeStreamSummary(
String streamName, KinesisAsyncClient kinesis) {
final DescribeStreamSummaryRequest request =
KinesisRequestsBuilder.describeStreamSummaryRequestBuilder().streamName(streamName).build();
try {
DescribeStreamSummaryResponse response = kinesis.describeStreamSummary(request).get();
return Optional.ofNullable(response.streamDescriptionSummary().streamARN());
} catch (ExecutionException | InterruptedException e) {
log.warn("Not able to get StreamARN from the DescribeStreamSummary call", e);
}
return Optional.empty();
}
public static Optional<String> getOptionalStreamARNFromConsumerARN(String consumerARN) {
if (StringUtils.isEmpty(consumerARN)) {
return Optional.empty();
}
final Matcher matcher = CONSUMER_ARN_PATTERN.matcher(consumerARN);
if (matcher.find()) {
return Optional.ofNullable(matcher.group(1));
} else {
log.warn("Can't extract the streamARN from consumerARN \"" + consumerARN +
"\", because it doesn't match the regex \"" + CONSUMER_ARN_PATTERN.pattern() + "\"");
return Optional.empty();
}
}
public static void trySetEmptyStreamARN(StreamIdentifier streamIdentifier, KinesisAsyncClient kinesisAsyncClient) {
Optional<String> optionalStreamARN = streamIdentifier.streamARN();
if (!optionalStreamARN.isPresent()) {
streamIdentifier.streamARN(
getOptionalStreamARNFromDescribeStreamSummary(streamIdentifier.streamName(), kinesisAsyncClient));
log.debug(streamIdentifier.streamARN().isPresent() ?
"Successfully set streamARN to " + streamIdentifier.streamARN().get() :
"Not able to set streamARN via DescribeStreamSummary call");
} else {
log.debug("StreamARN " + optionalStreamARN.get() + " is passed during initialization.");
}
}
}

View file

@ -19,30 +19,44 @@ import com.google.common.base.Joiner;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.NonNull; import lombok.NonNull;
import lombok.Setter;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.Validate;
import java.util.Optional; import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@EqualsAndHashCode @Getter @Accessors(fluent = true) import static software.amazon.kinesis.common.StreamARNUtil.getOptionalStreamARN;
import static software.amazon.kinesis.common.StreamARNUtil.getOptionalStreamARNFromDescribeStreamSummary;
import static software.amazon.kinesis.common.StreamARNUtil.getStreamName;
@Slf4j
@EqualsAndHashCode
@Getter
@Accessors(fluent = true)
public class StreamIdentifier { public class StreamIdentifier {
private final Optional<String> accountIdOptional; private final Optional<String> accountIdOptional;
private final String streamName; private final String streamName;
@Setter
private Optional<String> streamARN;
private final Optional<Long> streamCreationEpochOptional; private final Optional<Long> streamCreationEpochOptional;
private static final String DELIMITER = ":"; private static final String DELIMITER = ":";
private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*"); private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*");
private StreamIdentifier(@NonNull String accountId, @NonNull String streamName, @NonNull Long streamCreationEpoch) { private StreamIdentifier(@NonNull String accountId, @NonNull String streamNameOrARN, @NonNull Long streamCreationEpoch) {
this.accountIdOptional = Optional.of(accountId); this.accountIdOptional = Optional.of(accountId);
this.streamName = streamName; this.streamName = getStreamName(streamNameOrARN);
this.streamARN = getOptionalStreamARN(streamNameOrARN);
this.streamCreationEpochOptional = Optional.of(streamCreationEpoch); this.streamCreationEpochOptional = Optional.of(streamCreationEpoch);
} }
private StreamIdentifier(@NonNull String streamName) { private StreamIdentifier(@NonNull String streamNameOrARN) {
this.accountIdOptional = Optional.empty(); this.accountIdOptional = Optional.empty();
this.streamName = streamName; this.streamName = getStreamName(streamNameOrARN);
this.streamARN = getOptionalStreamARN(streamNameOrARN);
this.streamCreationEpochOptional = Optional.empty(); this.streamCreationEpochOptional = Optional.empty();
} }
@ -76,6 +90,17 @@ public class StreamIdentifier {
} }
} }
/**
* Create a multi stream instance for StreamIdentifier from accountId, streamNameOrARN and creationEpoch
* The serialized stream identifier should be of the format account:stream:creationepoch
* @param accountId The account's 12-digit ID, which is hosting the stream
* @param streamNameOrARN Although streamName and streamARN can both be supplied, streamARN is preferred
* @return creationEpoch The stream's creation time stamp
*/
public static StreamIdentifier multiStreamInstance(String accountId, String streamNameOrARN, Long creationEpoch) {
return new StreamIdentifier(accountId, streamNameOrARN, creationEpoch);
}
/** /**
* Create a single stream instance for StreamIdentifier from stream name. * Create a single stream instance for StreamIdentifier from stream name.
* @param streamName * @param streamName

View file

@ -198,7 +198,7 @@ public class KinesisShardDetector implements ShardDetector {
} else { } else {
builder = builder.nextToken(nextToken); builder = builder.nextToken(nextToken);
} }
streamIdentifier.streamARN().ifPresent(builder::streamARN);
final ListShardsRequest request = builder.build(); final ListShardsRequest request = builder.build();
log.info("Stream {}: listing shards with list shards request {}", streamIdentifier, request); log.info("Stream {}: listing shards with list shards request {}", streamIdentifier, request);
@ -275,14 +275,14 @@ public class KinesisShardDetector implements ShardDetector {
@Override @Override
public List<ChildShard> getChildShards(final String shardId) throws InterruptedException, ExecutionException, TimeoutException { public List<ChildShard> getChildShards(final String shardId) throws InterruptedException, ExecutionException, TimeoutException {
final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder() final GetShardIteratorRequest.Builder requestBuilder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()) .streamName(streamIdentifier.streamName())
.shardIteratorType(ShardIteratorType.LATEST) .shardIteratorType(ShardIteratorType.LATEST)
.shardId(shardId) .shardId(shardId);
.build(); streamIdentifier.streamARN().ifPresent(requestBuilder::streamARN);
final GetShardIteratorResponse getShardIteratorResponse = final GetShardIteratorResponse getShardIteratorResponse =
FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), kinesisRequestTimeout); FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(requestBuilder.build()), kinesisRequestTimeout);
final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder() final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(getShardIteratorResponse.shardIterator()) .shardIterator(getShardIteratorResponse.shardIterator())

View file

@ -32,6 +32,8 @@ import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig; import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig; import software.amazon.kinesis.retrieval.polling.PollingConfig;
import static software.amazon.kinesis.common.StreamARNUtil.trySetEmptyStreamARN;
/** /**
* Used by the KCL to configure the retrieval of records from Kinesis. * Used by the KCL to configure the retrieval of records from Kinesis.
*/ */
@ -102,18 +104,17 @@ public class RetrievalConfig {
private RetrievalFactory retrievalFactory; private RetrievalFactory retrievalFactory;
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName, public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamNameOrARN,
@NonNull String applicationName) { @NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient; this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either this.appStreamTracker = Either.right(getInitialStreamConfig(streamNameOrARN, kinesisAsyncClient));
.right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended));
this.applicationName = applicationName; this.applicationName = applicationName;
} }
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker, public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker,
@NonNull String applicationName) { @NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient; this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either.left(multiStreamTracker); this.appStreamTracker = Either.left(getInitialMultiStreamTracker(multiStreamTracker, kinesisAsyncClient));
this.applicationName = applicationName; this.applicationName = applicationName;
} }
@ -157,6 +158,7 @@ public class RetrievalConfig {
streamConfig -> streamConfig.streamIdentifier() == null streamConfig -> streamConfig.streamIdentifier() == null
|| streamConfig.streamIdentifier().streamName() == null); || streamConfig.streamIdentifier().streamName() == null);
if(isInvalidFanoutConfig) { if(isInvalidFanoutConfig) {
// TODO: fix the error message
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Invalid config: Either in multi-stream mode with streamName/consumerArn configured or in single-stream mode with no streamName configured"); "Invalid config: Either in multi-stream mode with streamName/consumerArn configured or in single-stream mode with no streamName configured");
} }
@ -171,8 +173,22 @@ public class RetrievalConfig {
streamConfig.streamIdentifier() == null || streamConfig.streamIdentifier().streamName() == null); streamConfig.streamIdentifier() == null || streamConfig.streamIdentifier().streamName() == null);
if (isInvalidPollingConfig) { if (isInvalidPollingConfig) {
// TODO: fix the error message
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Invalid config: Either in multi-stream mode with streamName configured or in single-stream mode with no streamName configured"); "Invalid config: Either in multi-stream mode with streamName configured or in single-stream mode with no streamName configured");
} }
} }
private StreamConfig getInitialStreamConfig(String streamNameOrARN, KinesisAsyncClient kinesisAsyncClient) {
StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(streamNameOrARN);
trySetEmptyStreamARN(streamIdentifier, kinesisAsyncClient);
return new StreamConfig(streamIdentifier, initialPositionInStreamExtended);
}
private MultiStreamTracker getInitialMultiStreamTracker(MultiStreamTracker multiStreamTracker, KinesisAsyncClient kinesisAsyncClient) {
multiStreamTracker.streamConfigList()
.forEach(streamConfig -> trySetEmptyStreamARN(streamConfig.streamIdentifier(), kinesisAsyncClient));
return multiStreamTracker;
}
} }

View file

@ -27,14 +27,12 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -54,6 +52,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Instant; import java.time.Instant;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -61,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
import static software.amazon.kinesis.common.StreamARNUtil.getOptionalStreamARNFromConsumerARN;
import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult; import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult;
@Slf4j @Slf4j
@ -75,6 +75,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final KinesisAsyncClient kinesis; private final KinesisAsyncClient kinesis;
private final String shardId; private final String shardId;
private final String consumerArn; private final String consumerArn;
private final Optional<String> streamARN;
private final String streamAndShardId; private final String streamAndShardId;
private final Object lockObject = new Object(); private final Object lockObject = new Object();
@ -97,6 +98,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
this.kinesis = kinesis; this.kinesis = kinesis;
this.shardId = shardId; this.shardId = shardId;
this.consumerArn = consumerArn; this.consumerArn = consumerArn;
this.streamARN = getOptionalStreamARNFromConsumerARN(consumerArn);
this.streamAndShardId = shardId; this.streamAndShardId = shardId;
} }
@ -104,6 +106,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
this.kinesis = kinesis; this.kinesis = kinesis;
this.shardId = shardId; this.shardId = shardId;
this.consumerArn = consumerArn; this.consumerArn = consumerArn;
this.streamARN = getOptionalStreamARNFromConsumerARN(consumerArn);
this.streamAndShardId = streamIdentifierSer + ":" + shardId; this.streamAndShardId = streamIdentifierSer + ":" + shardId;
} }
@ -292,6 +295,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
resetRecordsDeliveryStateOnSubscriptionOnInit(); resetRecordsDeliveryStateOnSubscriptionOnInit();
SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder()
.shardId(shardId).consumerARN(consumerArn); .shardId(shardId).consumerARN(consumerArn);
streamARN.ifPresent(builder::streamARN);
SubscribeToShardRequest request; SubscribeToShardRequest request;
if (isFirstConnection) { if (isFirstConnection) {
request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStreamExtended).build(); request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStreamExtended).build();
@ -304,8 +309,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
int subscribeInvocationId = subscribeToShardId.incrementAndGet(); int subscribeInvocationId = subscribeToShardId.incrementAndGet();
String instanceId = shardId + "-" + subscribeInvocationId; String instanceId = shardId + "-" + subscribeInvocationId;
log.debug( log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard", "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard with request {}",
streamAndShardId, connectionStart, instanceId); streamAndShardId, connectionStart, instanceId, request);
flow = new RecordFlow(this, connectionStart, instanceId); flow = new RecordFlow(this, connectionStart, instanceId);
kinesis.subscribeToShard(request, flow); kinesis.subscribeToShard(request, flow);
} }

View file

@ -231,8 +231,11 @@ public class KinesisDataFetcher implements DataFetcher {
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId); .streamName(streamIdentifier.streamName()).shardId(shardId);
streamIdentifier.streamARN().ifPresent(builder::streamARN);
GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream) GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream)
.build(); .build();
log.debug("GetShardIterator request has parameters: " + request);
// TODO: Check if this metric is fine to be added // TODO: Check if this metric is fine to be added
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION); final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
@ -302,9 +305,10 @@ public class KinesisDataFetcher implements DataFetcher {
} }
@Override @Override
public GetRecordsRequest getGetRecordsRequest(String nextIterator) { public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) GetRecordsRequest.Builder builder = KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator);
.limit(maxRecords).build(); streamIdentifier.streamARN().ifPresent(builder::streamARN);
return builder.build();
} }
@Override @Override

View file

@ -47,6 +47,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -67,6 +68,10 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory; import software.amazon.kinesis.checkpoint.CheckpointFactory;
@ -170,6 +175,10 @@ public class SchedulerTest {
shardSyncTaskManagerMap = new HashMap<>(); shardSyncTaskManagerMap = new HashMap<>();
shardDetectorMap = new HashMap<>(); shardDetectorMap = new HashMap<>();
shardRecordProcessorFactory = new TestShardRecordProcessorFactory(); shardRecordProcessorFactory = new TestShardRecordProcessorFactory();
when(kinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeStreamSummaryResponse.builder()
.streamDescriptionSummary(StreamDescriptionSummary.builder().streamARN("testStreamARN").build())
.build()));
checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory()); checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory());
coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener); coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener);

View file

@ -43,7 +43,7 @@ import java.util.Optional;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class FanOutConfigTest { public class FanOutConfigTest {
private static final String TEST_CONSUMER_ARN = "TestConsumerArn"; private static final String TEST_CONSUMER_ARN = "arn:aws:kinesis:us-east-2:123456789012:stream/mystream/consumer/TestConsumerArn:000";
private static final String TEST_APPLICATION_NAME = "TestApplication"; private static final String TEST_APPLICATION_NAME = "TestApplication";
private static final String TEST_STREAM_NAME = "TestStream"; private static final String TEST_STREAM_NAME = "TestStream";
private static final String TEST_CONSUMER_NAME = "TestConsumerName"; private static final String TEST_CONSUMER_NAME = "TestConsumerName";
@ -75,7 +75,6 @@ public class FanOutConfigTest {
.streamName(TEST_STREAM_NAME); .streamName(TEST_STREAM_NAME);
RetrievalFactory retrievalFactory = config.retrievalFactory(); RetrievalFactory retrievalFactory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class); ShardInfo shardInfo = mock(ShardInfo.class);
// doReturn(Optional.of(StreamIdentifier.singleStreamInstance(TEST_STREAM_NAME).serialize())).when(shardInfo).streamIdentifier();
doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt(); doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
retrievalFactory.createGetRecordsCache(shardInfo, streamConfig, mock(MetricsFactory.class)); retrievalFactory.createGetRecordsCache(shardInfo, streamConfig, mock(MetricsFactory.class));
assertThat(retrievalFactory, not(nullValue())); assertThat(retrievalFactory, not(nullValue()));

View file

@ -91,7 +91,7 @@ import static org.mockito.Mockito.verify;
public class FanOutRecordsPublisherTest { public class FanOutRecordsPublisherTest {
private static final String SHARD_ID = "Shard-001"; private static final String SHARD_ID = "Shard-001";
private static final String CONSUMER_ARN = "arn:consumer"; private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-2:123456789012:stream/mystream/consumer/TestConsumerArn:000";
private static final String CONTINUATION_SEQUENCE_NUMBER = "continuationSequenceNumber"; private static final String CONTINUATION_SEQUENCE_NUMBER = "continuationSequenceNumber";
@Mock @Mock

View file

@ -22,7 +22,7 @@
<artifactId>amazon-kinesis-client-pom</artifactId> <artifactId>amazon-kinesis-client-pom</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>Amazon Kinesis Client Library</name> <name>Amazon Kinesis Client Library</name>
<version>2.4.3</version> <version>2.5.0-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>
@ -33,7 +33,7 @@
</scm> </scm>
<properties> <properties>
<awssdk.version>2.17.268</awssdk.version> <awssdk.version>2.18.29</awssdk.version>
</properties> </properties>
<licenses> <licenses>