From a7ae4d3e24d936b7f408f6b10e68f5787e8e4076 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 23 Mar 2020 13:23:59 -0700 Subject: [PATCH] Minor refactoring --- .../fanout/FanOutRetrievalFactory.java | 8 +++- .../leases/HierarchicalShardSyncerTest.java | 39 ------------------- 2 files changed, 6 insertions(+), 41 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index f609c1d9..719d2e54 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -53,10 +53,14 @@ public class FanOutRetrievalFactory implements RetrievalFactory { final String streamName; if(streamIdentifierStr.isPresent()) { streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName(); + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), + streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply), + streamIdentifierStr.get()); } else { streamName = defaultStreamName; + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), + streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); } - return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 5008b912..9ca59edc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -1146,45 +1146,6 @@ public class HierarchicalShardSyncerTest { } } -// /** -// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) -// * Shard structure (each level depicts a stream segment): -// * 0 1 2 3 4 5- shards till epoch 102 -// * \ / \ / | | -// * 6 7 4 5- shards from epoch 103 - 205 -// * \ / | /\ -// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) -// * Current leases: (4, 5, 7) -// */ -// @Test - public void understandLeaseBehavior() { - final List shards = constructShardListForGraphA(); -// final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), -// newLease("shardId-7")); - - final List currentLeases = Collections.emptyList(); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST); - - System.out.println("Leases : " + newLeases.stream().map(lease -> lease.leaseKey() + ":" + lease.checkpoint()).collect( - Collectors.joining())); - - final Map expectedShardIdCheckpointMap = new HashMap<>(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); - - assertThat(newLeases.size(), equalTo(expectedShardIdCheckpointMap.size())); - for (Lease lease : newLeases) { - assertThat("Unexpected lease: " + lease, expectedShardIdCheckpointMap.containsKey(lease.leaseKey()), - equalTo(true)); - assertThat(lease.checkpoint(), equalTo(expectedShardIdCheckpointMap.get(lease.leaseKey()))); - } - } - - /** * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) * Shard structure (each level depicts a stream segment):