Caching consumerArn for StreamIdentifier in FanOutRetrievalFactory

This commit is contained in:
Ashwin Giridharan 2020-06-12 22:20:38 -07:00
parent 2530481dba
commit ce38178399

View file

@ -38,9 +38,11 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient; private final KinesisAsyncClient kinesisClient;
private final String defaultStreamName; private final String defaultStreamName;
private final String defaultConsumerName; private final String defaultConsumerArn;
private final Function<String, String> consumerArnCreator; private final Function<String, String> consumerArnCreator;
private Map<StreamIdentifier, String> implicitConsumerArnTracker = new HashMap<>();
@Override @Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
final MetricsFactory metricsFactory) { final MetricsFactory metricsFactory) {
@ -52,15 +54,15 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
final StreamConfig streamConfig, final StreamConfig streamConfig,
final MetricsFactory metricsFactory) { final MetricsFactory metricsFactory) {
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt(); final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
final String streamName;
if(streamIdentifierStr.isPresent()) { if(streamIdentifierStr.isPresent()) {
streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName(); final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
getOrCreateConsumerArn(streamName, streamConfig.consumerArn()), getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()),
streamIdentifierStr.get()); streamIdentifierStr.get());
} else { } else {
final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName);
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
getOrCreateConsumerArn(defaultStreamName, defaultConsumerName)); getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn));
} }
} }
@ -69,7 +71,8 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info"); throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info");
} }
private String getOrCreateConsumerArn(String streamName, String consumerArn) { private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) {
return consumerArn != null ? consumerArn : consumerArnCreator.apply(streamName); return consumerArn != null ? consumerArn : implicitConsumerArnTracker
.computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName()));
} }
} }