From ce381783996ff1307b96b31bbaad0b1606a8b33e Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jun 2020 22:20:38 -0700 Subject: [PATCH] Caching consumerArn for StreamIdentifier in FanOutRetrievalFactory --- .../fanout/FanOutRetrievalFactory.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index 5796862b..35301624 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -38,9 +38,11 @@ public class FanOutRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; private final String defaultStreamName; - private final String defaultConsumerName; + private final String defaultConsumerArn; private final Function consumerArnCreator; + private Map implicitConsumerArnTracker = new HashMap<>(); + @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, final MetricsFactory metricsFactory) { @@ -52,15 +54,15 @@ public class FanOutRetrievalFactory implements RetrievalFactory { final StreamConfig streamConfig, final MetricsFactory metricsFactory) { final Optional streamIdentifierStr = shardInfo.streamIdentifierSerOpt(); - final String streamName; if(streamIdentifierStr.isPresent()) { - streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName(); + final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()); return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - getOrCreateConsumerArn(streamName, streamConfig.consumerArn()), + getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()), streamIdentifierStr.get()); } else { + final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName); return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - getOrCreateConsumerArn(defaultStreamName, defaultConsumerName)); + getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn)); } } @@ -69,7 +71,8 @@ public class FanOutRetrievalFactory implements RetrievalFactory { throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info"); } - private String getOrCreateConsumerArn(String streamName, String consumerArn) { - return consumerArn != null ? consumerArn : consumerArnCreator.apply(streamName); + private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) { + return consumerArn != null ? consumerArn : implicitConsumerArnTracker + .computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName())); } }