diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index e712c6db..5c0f5e9a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -25,6 +25,8 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; +import java.util.HashMap; +import java.util.Map; import java.util.function.Function; @RequiredArgsConstructor @@ -34,6 +36,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; private final String defaultStreamName; private final Function consumerArnProvider; + private Map streamToconsumerArnMap = new HashMap<>(); @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, @@ -45,6 +48,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory { public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, final MetricsFactory metricsFactory) { final String streamName = shardInfo.streamName().orElse(defaultStreamName); - return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArnProvider.apply(streamName)); + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), + streamToconsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); } }