From fedd02c2a5b2dbc27d156e565c15792fc5d5db22 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 20 Feb 2020 15:48:56 -0800 Subject: [PATCH] Fixing fanout cross consumer issue --- .../kinesis/retrieval/fanout/FanOutRetrievalFactory.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index e712c6db..5c0f5e9a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -25,6 +25,8 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; +import java.util.HashMap; +import java.util.Map; import java.util.function.Function; @RequiredArgsConstructor @@ -34,6 +36,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; private final String defaultStreamName; private final Function consumerArnProvider; + private Map streamToconsumerArnMap = new HashMap<>(); @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, @@ -45,6 +48,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory { public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, final MetricsFactory metricsFactory) { final String streamName = shardInfo.streamName().orElse(defaultStreamName); - return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArnProvider.apply(streamName)); + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), + streamToconsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); } }