From e638c1730a9cdcca01620821ccadb22ee9483209 Mon Sep 17 00:00:00 2001 From: Roberto Tyley Date: Mon, 4 Oct 2021 11:21:11 +0100 Subject: [PATCH] Make use of Java 8 to simplify computeLeaseCounts() (#847) It's possible to remove quite a few lines of code from the method using Java 8 features (a `groupingBy` Collector, etc). Java 8 is the minimum supported version of the KCL since https://github.com/awslabs/amazon-kinesis-client/pull/176 was merged in July 2017. See also: https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#package.description http://cr.openjdk.java.net/~briangoetz/lambda/lambda-translation.html --- .../leases/dynamodb/DynamoDBLeaseTaker.java | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index d7c72e32..7aff9513 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -40,6 +40,9 @@ import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.summingInt; + /** * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. */ @@ -539,30 +542,16 @@ public class DynamoDBLeaseTaker implements LeaseTaker { * @return map of workerIdentifier to lease count */ private Map computeLeaseCounts(List expiredLeases) { - Map leaseCounts = new HashMap<>(); // The set will give much faster lookup than the original list, an // important optimization when the list is large Set expiredLeasesSet = new HashSet<>(expiredLeases); - // Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired. - for (Lease lease : allLeases.values()) { - if (!expiredLeasesSet.contains(lease)) { - String leaseOwner = lease.leaseOwner(); - Integer oldCount = leaseCounts.get(leaseOwner); - if (oldCount == null) { - leaseCounts.put(leaseOwner, 1); - } else { - leaseCounts.put(leaseOwner, oldCount + 1); - } - } - } + Map leaseCounts = allLeases.values().stream() + .filter(lease -> !expiredLeasesSet.contains(lease)) + .collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1))); - // If I have no leases, I wasn't represented in leaseCounts. Let's fix that. - Integer myCount = leaseCounts.get(workerIdentifier); - if (myCount == null) { - myCount = 0; - leaseCounts.put(workerIdentifier, myCount); - } + // If I have no leases, I won't be represented in leaseCounts. Let's fix that. + leaseCounts.putIfAbsent(workerIdentifier, 0); return leaseCounts; }