diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 11014505..6ddef3cf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -99,11 +98,11 @@ public class HierarchicalShardSyncer { this.deletedStreamListProvider = deletedStreamListProvider; } - private static final BiFunction SHARD_ID_FROM_LEASE_DEDUCER = - (lease, multiStreamArgs) -> - multiStreamArgs.isMultiStreamMode() ? - ((MultiStreamLease) lease).shardId() : - lease.leaseKey(); + private static String getShardIdFromLease(Lease lease, MultiStreamArgs multiStreamArgs) { + return multiStreamArgs.isMultiStreamMode() + ? ((MultiStreamLease) lease).shardId() + : lease.leaseKey(); + } /** * Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards @@ -739,8 +738,8 @@ public class HierarchicalShardSyncer { @Override public int compare(final Lease lease1, final Lease lease2) { int result = 0; - final String shardId1 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease1, multiStreamArgs); - final String shardId2 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease2, multiStreamArgs); + final String shardId1 = getShardIdFromLease(lease1, multiStreamArgs); + final String shardId2 = getShardIdFromLease(lease2, multiStreamArgs); final Shard shard1 = shardIdToShardMap.get(shardId1); final Shard shard2 = shardIdToShardMap.get(shardId2); @@ -813,7 +812,7 @@ public class HierarchicalShardSyncer { final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); currentLeases.stream().peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease)) - .map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs)) + .map(lease -> getShardIdFromLease(lease, multiStreamArgs)) .collect(Collectors.toSet()); final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs, streamIdentifier); @@ -919,7 +918,7 @@ public class HierarchicalShardSyncer { .map(streamId -> streamId.serialize()).orElse(""); final Set shardIdsOfCurrentLeases = currentLeases.stream() .peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease)) - .map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs)) + .map(lease -> getShardIdFromLease(lease, multiStreamArgs)) .collect(Collectors.toSet()); final List openShards = getOpenShards(shards, streamIdentifier);