Fixing fanout cross consumer issue

This commit is contained in:
Ashwin Giridharan 2020-02-20 15:48:56 -08:00
parent 097559eca2
commit fedd02c2a5

View file

@ -25,6 +25,8 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@RequiredArgsConstructor
@ -34,6 +36,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient;
private final String defaultStreamName;
private final Function<String, String> consumerArnProvider;
private Map<String,String> streamToconsumerArnMap = new HashMap<>();
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
@ -45,6 +48,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final MetricsFactory metricsFactory) {
final String streamName = shardInfo.streamName().orElse(defaultStreamName);
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArnProvider.apply(streamName));
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
streamToconsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply));
}
}