From eb00229602cbf1d988e13bd07be6ef51f5c7e733 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 1 May 2020 01:28:48 -0700 Subject: [PATCH] Hole detection logic --- .../coordinator/PeriodicShardSyncManager.java | 98 ++++++++++- .../software/amazon/kinesis/leases/Lease.java | 2 + .../PeriodicShardSyncManagerTest.java | 155 ++++++++++++++++++ 3 files changed, 248 insertions(+), 7 deletions(-) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 8df18207..3edec0ec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -14,14 +14,16 @@ */ package software.amazon.kinesis.coordinator; +import com.google.common.annotations.VisibleForTesting; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; -import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.MultiStreamLease; @@ -29,19 +31,21 @@ import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; -import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; +import java.io.Serializable; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -55,6 +59,8 @@ import java.util.stream.Collectors; class PeriodicShardSyncManager { private static final long INITIAL_DELAY = 60 * 1000L; private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; + private static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; + private static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); private final String workerId; private final LeaderDecider leaderDecider; @@ -177,12 +183,90 @@ class PeriodicShardSyncManager { * Checks if the entire hash range is covered * @return true if covered, false otherwise */ - public boolean isHashRangeComplete(List leases) { - if(CollectionUtils.isNullOrEmpty(leases)) { + private boolean isHashRangeCompleteForLeases(List leases) { + if (CollectionUtils.isNullOrEmpty(leases)) { return false; } else { -// leases.stream().filter(lease -> lease.checkpoint().isShardEnd()) - return false; + List hashRangesForActiveLeases = leases.stream() + .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()) + .map(lease -> lease.hashKeyRangeForLease()) + .collect(Collectors.toList()); + return !checkForHoleInHashKeyRanges(hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY).isPresent(); + } + } + + @VisibleForTesting + static Optional checkForHoleInHashKeyRanges(List hashKeyRanges, + BigInteger minHashKey, BigInteger maxHashKey) { + List mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); + + if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges + .get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) { + log.error("Incomplete hash range found between {} and {}.", mergedHashKeyRanges.get(0), + mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)); + return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0), + mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1))); + } + if (mergedHashKeyRanges.size() > 1) { + for (int i = 1; i < mergedHashKeyRanges.size(); i++) { + final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1); + final HashKeyRangeForLease hashRangeAtEndOfPossibleHole = mergedHashKeyRanges.get(i); + final BigInteger startOfPossibleHole = hashRangeAtStartOfPossibleHole.endingHashKey(); + final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey(); + + if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { + log.error("Incomplete hash range found between {} and {}.", hashRangeAtStartOfPossibleHole, + hashRangeAtEndOfPossibleHole); + return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole)); + } + } + } + return Optional.empty(); + } + + @VisibleForTesting + static List sortAndMergeOverlappingHashRanges(List hashKeyRanges) { + if(hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1) + return hashKeyRanges; + + Collections.sort(hashKeyRanges, new HashKeyRangeComparator()); + + final HashKeyRangeForLease first = hashKeyRanges.get(0); + BigInteger start = first.startingHashKey(); + BigInteger end = first.endingHashKey(); + + final List result = new ArrayList<>(); + + for (int i = 1; i < hashKeyRanges.size(); i++) { + HashKeyRangeForLease current = hashKeyRanges.get(i); + if (current.startingHashKey().compareTo(end) <= 0) { + end = current.endingHashKey().max(end); + } else { + result.add(new HashKeyRangeForLease(start, end)); + start = current.startingHashKey(); + end = current.endingHashKey(); + } + } + result.add(new HashKeyRangeForLease(start, end)); + return result; + } + + @Value + private static class HashRangeHole { + private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; + private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; + } + + /** + * Helper class to compare leases based on their hash range. + */ + private static class HashKeyRangeComparator implements Comparator, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) { + return hashKeyRange.startingHashKey().compareTo(otherHashKeyRange.startingHashKey()); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 427b3509..359b7a44 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -307,4 +307,6 @@ public class Lease { public Lease copy() { return new Lease(this); } + + } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java new file mode 100644 index 00000000..2567a00a --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -0,0 +1,155 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.common.HashKeyRangeForLease; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize; + +@RunWith(MockitoJUnitRunner.class) + +public class PeriodicShardSyncManagerTest { + + @Before + public void setup() { + + } + + @Test + public void testIfHashRangesAreNotMergedWhenNoOverlappingIntervalsGiven() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRanges); + Assert.assertEquals(hashRanges, sortAndMergedHashRanges); + } + + @Test + public void testIfHashRangesAreSortedWhenNoOverlappingIntervalsGiven() { + List hashRanges = new ArrayList() {{ + add(deserialize("2", "3")); + add(deserialize("0", "1")); + add(deserialize("24", "30")); + add(deserialize("4", "23")); + }}; + List hashRangesCopy = new ArrayList<>(); + hashRangesCopy.addAll(hashRanges); + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRangesCopy); + Assert.assertEquals(hashRangesCopy, sortAndMergedHashRanges); + Assert.assertNotEquals(hashRanges, sortAndMergedHashRanges); + } + + @Test + public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase1() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", "30")); + }}; + List expectedHashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRanges); + Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); + } + + @Test + public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase2() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "5")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List expectedHashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRanges); + Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); + } + + @Test + public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase3() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("4", "5")); + add(deserialize("24", "30")); + }}; + List expectedHashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRanges); + Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); + } + + @Test + public void testForFailureWhenHashRangesAreIncomplete() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("25", "30")); // Missing interval here + }}; + Assert.assertTrue(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + } + + @Test + public void testForSuccessWhenHashRangesAreComplete() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", "30")); + }}; + Assert.assertFalse(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + } + +}