Provide streamArn in getRecords request (#1219)

This commit is contained in:
furq-aws 2023-10-26 16:49:07 -07:00 committed by GitHub
parent 118783b18b
commit f90b1b1c05
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -307,23 +307,26 @@ public class KinesisShardDetector implements ShardDetector {
} }
@Override @Override
public List<ChildShard> getChildShards(final String shardId) throws InterruptedException, ExecutionException, TimeoutException { public List<ChildShard> getChildShards(final String shardId)
final GetShardIteratorRequest.Builder requestBuilder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() throws InterruptedException, ExecutionException, TimeoutException {
final GetShardIteratorRequest.Builder getShardIteratorRequestBuilder =
KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()) .streamName(streamIdentifier.streamName())
.shardIteratorType(ShardIteratorType.LATEST) .shardIteratorType(ShardIteratorType.LATEST)
.shardId(shardId); .shardId(shardId);
streamIdentifier.streamArnOptional().ifPresent(arn -> requestBuilder.streamARN(arn.toString())); streamIdentifier.streamArnOptional().ifPresent(arn -> getShardIteratorRequestBuilder.streamARN(arn.toString()));
final GetShardIteratorRequest getShardIteratorRequest = requestBuilder.build();
final GetShardIteratorResponse getShardIteratorResponse = final GetShardIteratorResponse getShardIteratorResponse = FutureUtils.resolveOrCancelFuture(
FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), kinesisRequestTimeout); kinesisClient.getShardIterator(getShardIteratorRequestBuilder.build()),
kinesisRequestTimeout);
final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder() final GetRecordsRequest.Builder getRecordsRequestBuilder = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(getShardIteratorResponse.shardIterator()) .shardIterator(getShardIteratorResponse.shardIterator());
.build(); streamIdentifier.streamArnOptional().ifPresent(arn -> getRecordsRequestBuilder.streamARN(arn.toString()));
final GetRecordsResponse getRecordsResponse = final GetRecordsResponse getRecordsResponse = FutureUtils.resolveOrCancelFuture(
FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), kinesisRequestTimeout); kinesisClient.getRecords(getRecordsRequestBuilder.build()),
kinesisRequestTimeout);
return getRecordsResponse.childShards(); return getRecordsResponse.childShards();
} }