Minor refactoring

This commit is contained in:
Ashwin Giridharan 2020-03-23 13:23:59 -07:00
parent a6f767bf96
commit a7ae4d3e24
2 changed files with 6 additions and 41 deletions

View file

@ -53,10 +53,14 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
final String streamName;
if(streamIdentifierStr.isPresent()) {
streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName();
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply),
streamIdentifierStr.get());
} else {
streamName = defaultStreamName;
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply));
}
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply));
}
}

View file

@ -1146,45 +1146,6 @@ public class HierarchicalShardSyncerTest {
}
}
// /**
// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest)
// * Shard structure (each level depicts a stream segment):
// * 0 1 2 3 4 5- shards till epoch 102
// * \ / \ / | |
// * 6 7 4 5- shards from epoch 103 - 205
// * \ / | /\
// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
// * Current leases: (4, 5, 7)
// */
// @Test
public void understandLeaseBehavior() {
final List<Shard> shards = constructShardListForGraphA();
// final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
// newLease("shardId-7"));
final List<Lease> currentLeases = Collections.emptyList();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
System.out.println("Leases : " + newLeases.stream().map(lease -> lease.leaseKey() + ":" + lease.checkpoint()).collect(
Collectors.joining()));
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON);
expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON);
expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST);
assertThat(newLeases.size(), equalTo(expectedShardIdCheckpointMap.size()));
for (Lease lease : newLeases) {
assertThat("Unexpected lease: " + lease, expectedShardIdCheckpointMap.containsKey(lease.leaseKey()),
equalTo(true));
assertThat(lease.checkpoint(), equalTo(expectedShardIdCheckpointMap.get(lease.leaseKey())));
}
}
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest)
* Shard structure (each level depicts a stream segment):