Provide streamArn in getRecords request

This commit is contained in:
Furqaan Ali 2023-10-26 16:08:02 -07:00
parent 118783b18b
commit b404117d7d

View file

@ -307,23 +307,26 @@ public class KinesisShardDetector implements ShardDetector {
} }
@Override @Override
public List<ChildShard> getChildShards(final String shardId) throws InterruptedException, ExecutionException, TimeoutException { public List<ChildShard> getChildShards(final String shardId)
final GetShardIteratorRequest.Builder requestBuilder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() throws InterruptedException, ExecutionException, TimeoutException {
.streamName(streamIdentifier.streamName()) final GetShardIteratorRequest.Builder getShardIteratorRequestBuilder =
.shardIteratorType(ShardIteratorType.LATEST) KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.shardId(shardId); .streamName(streamIdentifier.streamName())
streamIdentifier.streamArnOptional().ifPresent(arn -> requestBuilder.streamARN(arn.toString())); .shardIteratorType(ShardIteratorType.LATEST)
final GetShardIteratorRequest getShardIteratorRequest = requestBuilder.build(); .shardId(shardId);
streamIdentifier.streamArnOptional().ifPresent(arn -> getShardIteratorRequestBuilder.streamARN(arn.toString()));
final GetShardIteratorResponse getShardIteratorResponse = final GetShardIteratorResponse getShardIteratorResponse = FutureUtils.resolveOrCancelFuture(
FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), kinesisRequestTimeout); kinesisClient.getShardIterator(getShardIteratorRequestBuilder.build()),
kinesisRequestTimeout);
final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder() final GetRecordsRequest.Builder getRecordsRequestBuilder = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(getShardIteratorResponse.shardIterator()) .shardIterator(getShardIteratorResponse.shardIterator());
.build(); streamIdentifier.streamArnOptional().ifPresent(arn -> getRecordsRequestBuilder.streamARN(arn.toString()));
final GetRecordsResponse getRecordsResponse = final GetRecordsResponse getRecordsResponse = FutureUtils.resolveOrCancelFuture(
FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), kinesisRequestTimeout); kinesisClient.getRecords(getRecordsRequestBuilder.build()),
kinesisRequestTimeout);
return getRecordsResponse.childShards(); return getRecordsResponse.childShards();
} }