Hole detection logic

This commit is contained in:
Ashwin Giridharan 2020-05-01 01:28:48 -07:00
parent 5cd40e4718
commit eb00229602
3 changed files with 248 additions and 7 deletions

View file

@ -14,14 +14,16 @@
*/
package software.amazon.kinesis.coordinator;
import com.google.common.annotations.VisibleForTesting;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.MultiStreamLease;
@ -29,19 +31,21 @@ import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -55,6 +59,8 @@ import java.util.stream.Collectors;
class PeriodicShardSyncManager {
private static final long INITIAL_DELAY = 60 * 1000L;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L;
private static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
private static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
private final String workerId;
private final LeaderDecider leaderDecider;
@ -177,12 +183,90 @@ class PeriodicShardSyncManager {
* Checks if the entire hash range is covered
* @return true if covered, false otherwise
*/
public boolean isHashRangeComplete(List<Lease> leases) {
if(CollectionUtils.isNullOrEmpty(leases)) {
private boolean isHashRangeCompleteForLeases(List<Lease> leases) {
if (CollectionUtils.isNullOrEmpty(leases)) {
return false;
} else {
// leases.stream().filter(lease -> lease.checkpoint().isShardEnd())
return false;
List<HashKeyRangeForLease> hashRangesForActiveLeases = leases.stream()
.filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd())
.map(lease -> lease.hashKeyRangeForLease())
.collect(Collectors.toList());
return !checkForHoleInHashKeyRanges(hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY).isPresent();
}
}
@VisibleForTesting
static Optional<HashRangeHole> checkForHoleInHashKeyRanges(List<HashKeyRangeForLease> hashKeyRanges,
BigInteger minHashKey, BigInteger maxHashKey) {
List<HashKeyRangeForLease> mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges);
if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges
.get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) {
log.error("Incomplete hash range found between {} and {}.", mergedHashKeyRanges.get(0),
mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1));
return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0),
mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)));
}
if (mergedHashKeyRanges.size() > 1) {
for (int i = 1; i < mergedHashKeyRanges.size(); i++) {
final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1);
final HashKeyRangeForLease hashRangeAtEndOfPossibleHole = mergedHashKeyRanges.get(i);
final BigInteger startOfPossibleHole = hashRangeAtStartOfPossibleHole.endingHashKey();
final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey();
if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) {
log.error("Incomplete hash range found between {} and {}.", hashRangeAtStartOfPossibleHole,
hashRangeAtEndOfPossibleHole);
return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole));
}
}
}
return Optional.empty();
}
@VisibleForTesting
static List<HashKeyRangeForLease> sortAndMergeOverlappingHashRanges(List<HashKeyRangeForLease> hashKeyRanges) {
if(hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1)
return hashKeyRanges;
Collections.sort(hashKeyRanges, new HashKeyRangeComparator());
final HashKeyRangeForLease first = hashKeyRanges.get(0);
BigInteger start = first.startingHashKey();
BigInteger end = first.endingHashKey();
final List<HashKeyRangeForLease> result = new ArrayList<>();
for (int i = 1; i < hashKeyRanges.size(); i++) {
HashKeyRangeForLease current = hashKeyRanges.get(i);
if (current.startingHashKey().compareTo(end) <= 0) {
end = current.endingHashKey().max(end);
} else {
result.add(new HashKeyRangeForLease(start, end));
start = current.startingHashKey();
end = current.endingHashKey();
}
}
result.add(new HashKeyRangeForLease(start, end));
return result;
}
@Value
private static class HashRangeHole {
private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole;
private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole;
}
/**
* Helper class to compare leases based on their hash range.
*/
private static class HashKeyRangeComparator implements Comparator<HashKeyRangeForLease>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) {
return hashKeyRange.startingHashKey().compareTo(otherHashKeyRange.startingHashKey());
}
}
}

View file

@ -307,4 +307,6 @@ public class Lease {
public Lease copy() {
return new Lease(this);
}
}

View file

@ -0,0 +1,155 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize;
@RunWith(MockitoJUnitRunner.class)
public class PeriodicShardSyncManagerTest {
@Before
public void setup() {
}
@Test
public void testIfHashRangesAreNotMergedWhenNoOverlappingIntervalsGiven() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRanges);
Assert.assertEquals(hashRanges, sortAndMergedHashRanges);
}
@Test
public void testIfHashRangesAreSortedWhenNoOverlappingIntervalsGiven() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("2", "3"));
add(deserialize("0", "1"));
add(deserialize("24", "30"));
add(deserialize("4", "23"));
}};
List<HashKeyRangeForLease> hashRangesCopy = new ArrayList<>();
hashRangesCopy.addAll(hashRanges);
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRangesCopy);
Assert.assertEquals(hashRangesCopy, sortAndMergedHashRanges);
Assert.assertNotEquals(hashRanges, sortAndMergedHashRanges);
}
@Test
public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase1() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> expectedHashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRanges);
Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges);
}
@Test
public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase2() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "5"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> expectedHashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRanges);
Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges);
}
@Test
public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase3() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("4", "5"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> expectedHashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRanges);
Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges);
}
@Test
public void testForFailureWhenHashRangesAreIncomplete() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("25", "30")); // Missing interval here
}};
Assert.assertTrue(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent());
}
@Test
public void testForSuccessWhenHashRangesAreComplete() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", "30"));
}};
Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent());
}
}