From eb7a60ec624eb0e26a281e7d0fdadf2befb97c0a Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 29 Apr 2020 15:56:15 -0700 Subject: [PATCH 01/11] Fixing a metrics bug --- .../src/main/java/software/amazon/kinesis/leases/Lease.java | 1 - 1 file changed, 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 3df5097e..427b3509 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -163,7 +163,6 @@ public class Lease { pendingCheckpointState(lease.pendingCheckpointState); parentShardIds(lease.parentShardIds); childShardIds(lease.childShardIds); - hashKeyRange(lease.hashKeyRangeForLease); } /** From 5cd40e4718e7f7576e1c0ba64cf03aee37e44936 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 30 Apr 2020 14:03:39 -0700 Subject: [PATCH 02/11] Lease Recovery and Blockoing parent bug fix --- .../coordinator/PeriodicShardSyncManager.java | 75 ++++++++++++--- .../amazon/kinesis/coordinator/Scheduler.java | 20 +--- .../leases/HierarchicalShardSyncer.java | 5 +- .../amazon/kinesis/leases/LeaseRefresher.java | 2 +- .../lifecycle/BlockOnParentShardTask.java | 2 - .../retrieval/kpl/ExtendedSequenceNumber.java | 5 + .../lifecycle/BlockOnParentShardTaskTest.java | 92 ++++++++++++++++++- 7 files changed, 164 insertions(+), 37 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 8acccce9..8df18207 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -18,20 +18,32 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.stream.Collectors; /** * The top level orchestrator for coordinating the periodic shard sync related @@ -46,25 +58,33 @@ class PeriodicShardSyncManager { private final String workerId; private final LeaderDecider leaderDecider; + private final LeaseRefresher leaseRefresher; private final Map currentStreamConfigMap; private final Function shardSyncTaskManagerProvider; private final ScheduledExecutorService shardSyncThreadPool; + private final boolean isMultiStreamingMode; private boolean isRunning; - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map currentStreamConfigMap, - Function shardSyncTaskManagerProvider) { - this(workerId, leaderDecider, currentStreamConfigMap, shardSyncTaskManagerProvider, Executors.newSingleThreadScheduledExecutor()); + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, + Map currentStreamConfigMap, + Function shardSyncTaskManagerProvider, boolean isMultiStreamingMode) { + this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, + Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode); } - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map currentStreamConfigMap, - Function shardSyncTaskManagerProvider, ScheduledExecutorService shardSyncThreadPool) { + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, + Map currentStreamConfigMap, + Function shardSyncTaskManagerProvider, + ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); this.workerId = workerId; this.leaderDecider = leaderDecider; + this.leaseRefresher = leaseRefresher; this.currentStreamConfigMap = currentStreamConfigMap; this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider; this.shardSyncThreadPool = shardSyncThreadPool; + this.isMultiStreamingMode = isMultiStreamingMode; } public synchronized TaskResult start() { @@ -116,24 +136,53 @@ class PeriodicShardSyncManager { private void runShardSync() { if (leaderDecider.isLeader(workerId)) { - for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { - final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); - if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { - log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", - shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + try { + final Map> streamToLeasesMap = getStreamToLeasesMap(currentStreamConfigMap.keySet()); + for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); + if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { + log.warn( + "Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + } } + } catch (Exception e) { + // TODO : Log } } else { log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); } } + private Map> getStreamToLeasesMap(final Set streamIdentifiersToFilter) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final List leases = leaseRefresher.listLeases(); + if(!isMultiStreamingMode) { + Validate.isTrue(streamIdentifiersToFilter.size() == 1); + return Collections.singletonMap(streamIdentifiersToFilter.iterator().next(), leases); + } else { + final Map> streamToLeasesMap = new HashMap<>(); + for(Lease lease : leases) { + StreamIdentifier streamIdentifier = StreamIdentifier + .multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()); + if(streamIdentifiersToFilter.contains(streamIdentifier)) { + streamToLeasesMap.computeIfAbsent(streamIdentifier, s -> new ArrayList<>()).add(lease); + } + } + return streamToLeasesMap; + } + } + /** * Checks if the entire hash range is covered * @return true if covered, false otherwise */ - public boolean hashRangeCovered() { - // TODO: Implement method - return true; + public boolean isHashRangeComplete(List leases) { + if(CollectionUtils.isNullOrEmpty(leases)) { + return false; + } else { +// leases.stream().filter(lease -> lease.checkpoint().isShardEnd()) + return false; + } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index a20f86d1..d5eb1be6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -111,7 +111,6 @@ public class Scheduler implements Runnable { private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; - private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker"; private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count"; @@ -289,8 +288,8 @@ public class Scheduler implements Runnable { this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( - leaseManagementConfig.workerIdentifier(), leaderDecider, currentStreamConfigMap, - shardSyncTaskManagerProvider); + leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, + shardSyncTaskManagerProvider, isMultiStreamMode); } /** @@ -351,11 +350,10 @@ public class Scheduler implements Runnable { } else { log.info("LeaseCoordinator is already running. No need to start it."); } - log.info("Scheduling periodicShardSync)"); + log.info("Scheduling periodicShardSync"); // leaderElectedPeriodicShardSyncManager.start(shardSyncTasks); // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged // TODO: Determine if waitUntilHashRangeCovered() is needed. - //waitUntilHashRangeCovered(); streamSyncWatch.start(); isDone = true; } catch (LeasingException e) { @@ -398,18 +396,6 @@ public class Scheduler implements Runnable { return shouldInitiateLeaseSync; } - private void waitUntilHashRangeCovered() throws InterruptedException { - - // TODO: Currently this call is not in use. We may need to implement this method later. Created SIM to track the work: https://sim.amazon.com/issues/KinesisLTR-202 - // TODO: For future implementation, streamToShardSyncTaskManagerMap might not contain the most up to date snapshot of active streams. - // Should use currentStreamConfigMap to determine the streams to check. - while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) { - // wait until entire hash range is covered - log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS); - Thread.sleep(HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS); - } - } - @VisibleForTesting void runProcessLoop() { try { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 714e7f4e..2b2df48c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -45,7 +45,6 @@ import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -341,7 +340,7 @@ public class HierarchicalShardSyncer { "Stream " + streamName + " is not in ACTIVE OR UPDATING state - will retry getting the shard list."); } - if (hashRangeOfShardsIsComplete(shards)) { + if (isHashRangeOfShardsComplete(shards)) { return shards; } @@ -359,7 +358,7 @@ public class HierarchicalShardSyncer { " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); } - private static boolean hashRangeOfShardsIsComplete(@NonNull List shards) { + private static boolean isHashRangeOfShardsComplete(@NonNull List shards) { if (shards.isEmpty()) { throw new IllegalStateException("No shards found when attempting to validate complete hash range."); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index fc3aba8b..0d563a63 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -105,7 +105,7 @@ public interface LeaseRefresher { * @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity * @throws DependencyException if DynamoDB get fails in an unexpected way * - * @return lease for the specified shardId, or null if one doesn't exist + * @return lease for the specified leaseKey, or null if one doesn't exist */ Lease getLease(String leaseKey) throws DependencyException, InvalidStateException, ProvisionedThroughputException; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index 8797085a..fcb3ffde 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -25,8 +25,6 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.util.function.Function; - /** * Task to block until processing of all data records in the parent shard(s) is completed. * We check if we have checkpoint(s) for the parent shard(s). diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java index e2de9786..0c1c4a28 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java @@ -141,6 +141,11 @@ public class ExtendedSequenceNumber implements Comparable emptyParentShardIds = new ArrayList(); private ShardInfo shardInfo; @@ -107,6 +109,50 @@ public class BlockOnParentShardTaskTest { assertNull(result.getException()); } + /** + * Test call() when there are 1-2 parent shards that have been fully processed. + * @throws ProvisionedThroughputException + * @throws InvalidStateException + * @throws DependencyException + */ + @Test + public final void testCallWhenParentsHaveFinishedMultiStream() + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + + ShardInfo shardInfo = null; + BlockOnParentShardTask task = null; + String parent1LeaseKey = streamId + ":" + "shardId-1"; + String parent2LeaseKey = streamId + ":" + "shardId-2"; + String parent1ShardId = "shardId-1"; + String parent2ShardId = "shardId-2"; + List parentShardIds = new ArrayList<>(); + TaskResult result = null; + + Lease parent1Lease = new Lease(); + parent1Lease.checkpoint(ExtendedSequenceNumber.SHARD_END); + Lease parent2Lease = new Lease(); + parent2Lease.checkpoint(ExtendedSequenceNumber.SHARD_END); + + LeaseRefresher leaseRefresher = mock(LeaseRefresher.class); + when(leaseRefresher.getLease(parent1LeaseKey)).thenReturn(parent1Lease); + when(leaseRefresher.getLease(parent2LeaseKey)).thenReturn(parent2Lease); + + // test single parent + parentShardIds.add(parent1ShardId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, + streamId); + task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); + result = task.call(); + assertNull(result.getException()); + + // test two parents + parentShardIds.add(parent2ShardId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId); + task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); + result = task.call(); + assertNull(result.getException()); + } + /** * Test call() when there are 1-2 parent shards that have NOT been fully processed. * @throws ProvisionedThroughputException @@ -115,7 +161,7 @@ public class BlockOnParentShardTaskTest { */ @Test public final void testCallWhenParentsHaveNotFinished() - throws DependencyException, InvalidStateException, ProvisionedThroughputException { + throws DependencyException, InvalidStateException, ProvisionedThroughputException { ShardInfo shardInfo = null; BlockOnParentShardTask task = null; @@ -149,6 +195,50 @@ public class BlockOnParentShardTaskTest { assertNotNull(result.getException()); } + /** + * Test call() when there are 1-2 parent shards that have NOT been fully processed. + * @throws ProvisionedThroughputException + * @throws InvalidStateException + * @throws DependencyException + */ + @Test + public final void testCallWhenParentsHaveNotFinishedMultiStream() + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + + ShardInfo shardInfo = null; + BlockOnParentShardTask task = null; + String parent1LeaseKey = streamId + ":" + "shardId-1"; + String parent2LeaseKey = streamId + ":" + "shardId-2"; + String parent1ShardId = "shardId-1"; + String parent2ShardId = "shardId-2"; + List parentShardIds = new ArrayList<>(); + TaskResult result = null; + + Lease parent1Lease = new Lease(); + parent1Lease.checkpoint(ExtendedSequenceNumber.LATEST); + Lease parent2Lease = new Lease(); + // mock a sequence number checkpoint + parent2Lease.checkpoint(new ExtendedSequenceNumber("98182584034")); + + LeaseRefresher leaseRefresher = mock(LeaseRefresher.class); + when(leaseRefresher.getLease(parent1LeaseKey)).thenReturn(parent1Lease); + when(leaseRefresher.getLease(parent2LeaseKey)).thenReturn(parent2Lease); + + // test single parent + parentShardIds.add(parent1ShardId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId); + task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); + result = task.call(); + assertNotNull(result.getException()); + + // test two parents + parentShardIds.add(parent2ShardId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId); + task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); + result = task.call(); + assertNotNull(result.getException()); + } + /** * Test call() with 1 parent shard before and after it is completely processed. * @throws ProvisionedThroughputException From eb00229602cbf1d988e13bd07be6ef51f5c7e733 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 1 May 2020 01:28:48 -0700 Subject: [PATCH 03/11] Hole detection logic --- .../coordinator/PeriodicShardSyncManager.java | 98 ++++++++++- .../software/amazon/kinesis/leases/Lease.java | 2 + .../PeriodicShardSyncManagerTest.java | 155 ++++++++++++++++++ 3 files changed, 248 insertions(+), 7 deletions(-) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 8df18207..3edec0ec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -14,14 +14,16 @@ */ package software.amazon.kinesis.coordinator; +import com.google.common.annotations.VisibleForTesting; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; -import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.MultiStreamLease; @@ -29,19 +31,21 @@ import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; -import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; +import java.io.Serializable; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -55,6 +59,8 @@ import java.util.stream.Collectors; class PeriodicShardSyncManager { private static final long INITIAL_DELAY = 60 * 1000L; private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; + private static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; + private static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); private final String workerId; private final LeaderDecider leaderDecider; @@ -177,12 +183,90 @@ class PeriodicShardSyncManager { * Checks if the entire hash range is covered * @return true if covered, false otherwise */ - public boolean isHashRangeComplete(List leases) { - if(CollectionUtils.isNullOrEmpty(leases)) { + private boolean isHashRangeCompleteForLeases(List leases) { + if (CollectionUtils.isNullOrEmpty(leases)) { return false; } else { -// leases.stream().filter(lease -> lease.checkpoint().isShardEnd()) - return false; + List hashRangesForActiveLeases = leases.stream() + .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()) + .map(lease -> lease.hashKeyRangeForLease()) + .collect(Collectors.toList()); + return !checkForHoleInHashKeyRanges(hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY).isPresent(); + } + } + + @VisibleForTesting + static Optional checkForHoleInHashKeyRanges(List hashKeyRanges, + BigInteger minHashKey, BigInteger maxHashKey) { + List mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); + + if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges + .get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) { + log.error("Incomplete hash range found between {} and {}.", mergedHashKeyRanges.get(0), + mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)); + return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0), + mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1))); + } + if (mergedHashKeyRanges.size() > 1) { + for (int i = 1; i < mergedHashKeyRanges.size(); i++) { + final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1); + final HashKeyRangeForLease hashRangeAtEndOfPossibleHole = mergedHashKeyRanges.get(i); + final BigInteger startOfPossibleHole = hashRangeAtStartOfPossibleHole.endingHashKey(); + final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey(); + + if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { + log.error("Incomplete hash range found between {} and {}.", hashRangeAtStartOfPossibleHole, + hashRangeAtEndOfPossibleHole); + return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole)); + } + } + } + return Optional.empty(); + } + + @VisibleForTesting + static List sortAndMergeOverlappingHashRanges(List hashKeyRanges) { + if(hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1) + return hashKeyRanges; + + Collections.sort(hashKeyRanges, new HashKeyRangeComparator()); + + final HashKeyRangeForLease first = hashKeyRanges.get(0); + BigInteger start = first.startingHashKey(); + BigInteger end = first.endingHashKey(); + + final List result = new ArrayList<>(); + + for (int i = 1; i < hashKeyRanges.size(); i++) { + HashKeyRangeForLease current = hashKeyRanges.get(i); + if (current.startingHashKey().compareTo(end) <= 0) { + end = current.endingHashKey().max(end); + } else { + result.add(new HashKeyRangeForLease(start, end)); + start = current.startingHashKey(); + end = current.endingHashKey(); + } + } + result.add(new HashKeyRangeForLease(start, end)); + return result; + } + + @Value + private static class HashRangeHole { + private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; + private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; + } + + /** + * Helper class to compare leases based on their hash range. + */ + private static class HashKeyRangeComparator implements Comparator, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) { + return hashKeyRange.startingHashKey().compareTo(otherHashKeyRange.startingHashKey()); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 427b3509..359b7a44 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -307,4 +307,6 @@ public class Lease { public Lease copy() { return new Lease(this); } + + } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java new file mode 100644 index 00000000..2567a00a --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -0,0 +1,155 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.common.HashKeyRangeForLease; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize; + +@RunWith(MockitoJUnitRunner.class) + +public class PeriodicShardSyncManagerTest { + + @Before + public void setup() { + + } + + @Test + public void testIfHashRangesAreNotMergedWhenNoOverlappingIntervalsGiven() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRanges); + Assert.assertEquals(hashRanges, sortAndMergedHashRanges); + } + + @Test + public void testIfHashRangesAreSortedWhenNoOverlappingIntervalsGiven() { + List hashRanges = new ArrayList() {{ + add(deserialize("2", "3")); + add(deserialize("0", "1")); + add(deserialize("24", "30")); + add(deserialize("4", "23")); + }}; + List hashRangesCopy = new ArrayList<>(); + hashRangesCopy.addAll(hashRanges); + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRangesCopy); + Assert.assertEquals(hashRangesCopy, sortAndMergedHashRanges); + Assert.assertNotEquals(hashRanges, sortAndMergedHashRanges); + } + + @Test + public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase1() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", "30")); + }}; + List expectedHashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRanges); + Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); + } + + @Test + public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase2() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "5")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List expectedHashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRanges); + Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); + } + + @Test + public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase3() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("4", "5")); + add(deserialize("24", "30")); + }}; + List expectedHashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("24", "30")); + }}; + List sortAndMergedHashRanges = PeriodicShardSyncManager + .sortAndMergeOverlappingHashRanges(hashRanges); + Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); + } + + @Test + public void testForFailureWhenHashRangesAreIncomplete() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("25", "30")); // Missing interval here + }}; + Assert.assertTrue(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + } + + @Test + public void testForSuccessWhenHashRangesAreComplete() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", "30")); + }}; + Assert.assertFalse(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + } + +} From a6922d9d7eaf1f8df4a7806d9ed0045320b603e0 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Sat, 2 May 2020 03:22:59 -0700 Subject: [PATCH 04/11] Hash range hole confidence check --- .../coordinator/PeriodicShardSyncManager.java | 74 +++++++++++++------ 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 3edec0ec..90c95cf3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.coordinator; import com.google.common.annotations.VisibleForTesting; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NonNull; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; @@ -61,6 +62,8 @@ class PeriodicShardSyncManager { private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; private static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; private static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); + private static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; + private Map hashRangeHoleTrackerMap = new HashMap<>(); private final String workerId; private final LeaderDecider leaderDecider; @@ -145,6 +148,7 @@ class PeriodicShardSyncManager { try { final Map> streamToLeasesMap = getStreamToLeasesMap(currentStreamConfigMap.keySet()); for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { log.warn( @@ -160,18 +164,19 @@ class PeriodicShardSyncManager { } } - private Map> getStreamToLeasesMap(final Set streamIdentifiersToFilter) + private Map> getStreamToLeasesMap( + final Set streamIdentifiersToFilter) throws DependencyException, ProvisionedThroughputException, InvalidStateException { final List leases = leaseRefresher.listLeases(); - if(!isMultiStreamingMode) { + if (!isMultiStreamingMode) { Validate.isTrue(streamIdentifiersToFilter.size() == 1); return Collections.singletonMap(streamIdentifiersToFilter.iterator().next(), leases); } else { final Map> streamToLeasesMap = new HashMap<>(); - for(Lease lease : leases) { + for (Lease lease : leases) { StreamIdentifier streamIdentifier = StreamIdentifier .multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()); - if(streamIdentifiersToFilter.contains(streamIdentifier)) { + if (streamIdentifiersToFilter.contains(streamIdentifier)) { streamToLeasesMap.computeIfAbsent(streamIdentifier, s -> new ArrayList<>()).add(lease); } } @@ -179,25 +184,37 @@ class PeriodicShardSyncManager { } } - /** - * Checks if the entire hash range is covered - * @return true if covered, false otherwise - */ - private boolean isHashRangeCompleteForLeases(List leases) { + // TODO : Catch exception + private boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List leases) { if (CollectionUtils.isNullOrEmpty(leases)) { - return false; + throw new IllegalArgumentException("No leases found to validate for the stream " + streamIdentifier); + } + // Check if there are any holes in the leases and return the first hole if present. + Optional hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); + if (hashRangeHoleOpt.isPresent()) { + // If hole is present, check if the hole is detected consecutively in previous occurrences. + // If hole is determined with high confidence return true; return false otherwise + return hashRangeHoleTrackerMap.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker()) + .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); + } else { - List hashRangesForActiveLeases = leases.stream() - .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()) - .map(lease -> lease.hashKeyRangeForLease()) - .collect(Collectors.toList()); - return !checkForHoleInHashKeyRanges(hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY).isPresent(); + // If hole is not present, clear any previous tracking for this stream and return false; + hashRangeHoleTrackerMap.remove(streamIdentifier); + return false; } } + private Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { + // Filter the hashranges of leases which has any checkpoint other than shard end. + List hashRangesForActiveLeases = leases.stream() + .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()) + .map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList()); + return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY); + } + @VisibleForTesting - static Optional checkForHoleInHashKeyRanges(List hashKeyRanges, - BigInteger minHashKey, BigInteger maxHashKey) { + static Optional checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier, + List hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) { List mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges @@ -225,8 +242,9 @@ class PeriodicShardSyncManager { } @VisibleForTesting - static List sortAndMergeOverlappingHashRanges(List hashKeyRanges) { - if(hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1) + static List sortAndMergeOverlappingHashRanges( + List hashKeyRanges) { + if (hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1) return hashKeyRanges; Collections.sort(hashKeyRanges, new HashKeyRangeComparator()); @@ -257,6 +275,21 @@ class PeriodicShardSyncManager { private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; } + private static class HashRangeHoleTracker { + private HashRangeHole hashRangeHole; + private Integer numConsecutiveHoles; + + public boolean hasHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) { + if (hashRangeHole.equals(this.hashRangeHole)) { + ++this.numConsecutiveHoles; + } else { + this.hashRangeHole = hashRangeHole; + this.numConsecutiveHoles = 1; + } + return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; + } + } + /** * Helper class to compare leases based on their hash range. */ @@ -264,8 +297,7 @@ class PeriodicShardSyncManager { private static final long serialVersionUID = 1L; - @Override - public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) { + @Override public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) { return hashKeyRange.startingHashKey().compareTo(otherHashKeyRange.startingHashKey()); } } From 9e97edd2730d6cf756b78e9e76c4a7633a8e486f Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 6 May 2020 11:06:23 -0700 Subject: [PATCH 05/11] Reusing code while determining leaskekey --- .../amazon/kinesis/coordinator/Scheduler.java | 6 ++-- .../lifecycle/BlockOnParentShardTask.java | 3 +- .../amazon/kinesis/lifecycle/ProcessTask.java | 3 +- .../lifecycle/ShardConsumerSubscriber.java | 4 +-- .../kinesis/lifecycle/ShutdownTask.java | 33 +++---------------- 5 files changed, 10 insertions(+), 39 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index d5eb1be6..a2f9a45e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -851,8 +851,7 @@ public class Scheduler implements Runnable { if (!firstItem) { builder.append(", "); } - builder.append(shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) - .orElse(shardInfo.shardId())); + builder.append(ShardInfo.getLeaseKey(shardInfo)); firstItem = false; } slog.info("Current stream shard assignments: " + builder.toString()); @@ -948,8 +947,7 @@ public class Scheduler implements Runnable { ShardConsumer consumer = shardInfoShardConsumerMap.get(shard); if (consumer.leaseLost()) { shardInfoShardConsumerMap.remove(shard); - log.debug("Removed consumer for {} as lease has been lost", - shard.streamIdentifierSerOpt().map(s -> s + ":" + shard.shardId()).orElse(shard.shardId())); + log.debug("Removed consumer for {} as lease has been lost", ShardInfo.getLeaseKey(shard)); } else { consumer.executeLifecycle(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index fcb3ffde..5f1ee18c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -54,8 +54,7 @@ public class BlockOnParentShardTask implements ConsumerTask { @Override public TaskResult call() { Exception exception = null; - final String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) - .orElse(shardInfo.shardId()); + final String shardInfoId = ShardInfo.getLeaseKey(shardInfo); try { boolean blockedOnParentShard = false; for (String shardId : shardInfo.parentShardIds()) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index f576154a..6c52e0de 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -76,8 +76,7 @@ public class ProcessTask implements ConsumerTask { @NonNull AggregatorUtil aggregatorUtil, @NonNull MetricsFactory metricsFactory) { this.shardInfo = shardInfo; - this.shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) - .orElse(shardInfo.shardId()); + this.shardInfoId = ShardInfo.getLeaseKey(shardInfo); this.shardRecordProcessor = shardRecordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.backoffTimeMillis = backoffTimeMillis; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 177c0f43..21e8c2c9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -24,6 +24,7 @@ import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; @@ -70,8 +71,7 @@ class ShardConsumerSubscriber implements Subscriber { this.bufferSize = bufferSize; this.shardConsumer = shardConsumer; this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning; - this.shardInfoId = shardConsumer.shardInfo().streamIdentifierSerOpt() - .map(s -> s + ":" + shardConsumer.shardInfo().shardId()).orElse(shardConsumer.shardInfo().shardId()); + this.shardInfoId = ShardInfo.getLeaseKey(shardConsumer.shardInfo()); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 9d53e75c..3449b723 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -15,15 +15,10 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; - -import java.util.List; -import java.util.function.Function; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.kinesis.model.ChildShard; -import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; @@ -47,8 +42,10 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -89,8 +86,8 @@ public class ShutdownTask implements ConsumerTask { private final List childShards; - private static final Function leaseKeyProvider = shardInfo -> shardInfo - .streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId()); + private static final Function leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo); + /* * Invokes ShardRecordProcessor shutdown() API. * (non-Javadoc) @@ -218,26 +215,4 @@ public class ShutdownTask implements ConsumerTask { return reason; } - private boolean isShardInContextParentOfAny(List shards) { - for(Shard shard : shards) { - if (isChildShardOfShardInContext(shard)) { - return true; - } - } - return false; - } - - private boolean isChildShardOfShardInContext(Shard shard) { - return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId()) - || StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId())); - } - - private void dropLease() { - Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); - leaseCoordinator.dropLease(currentLease); - if(currentLease != null) { - log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); - } - } - } From 9115f2000b383aeb2fcb372419c53cefeac0fdb0 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 7 May 2020 01:28:56 -0700 Subject: [PATCH 06/11] Logic to auto fill the missing hashranges and lease recovery. Added more unit test cases --- .../coordinator/PeriodicShardSyncManager.java | 116 +++++-- .../amazon/kinesis/coordinator/Scheduler.java | 8 +- .../amazon/kinesis/leases/LeaseRefresher.java | 15 + .../kinesis/leases/LeaseSerializer.java | 10 + .../kinesis/leases/ShardSyncTaskManager.java | 14 +- .../amazon/kinesis/leases/UpdateField.java | 19 ++ .../dynamodb/DynamoDBLeaseRefresher.java | 22 ++ .../dynamodb/DynamoDBLeaseSerializer.java | 23 ++ .../kinesis/lifecycle/ShutdownTask.java | 4 +- .../PeriodicShardSyncManagerTest.java | 296 +++++++++++++++++- .../kinesis/coordinator/SchedulerTest.java | 4 +- 11 files changed, 495 insertions(+), 36 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 90c95cf3..3979236b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -21,6 +21,7 @@ import lombok.NonNull; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.StreamConfig; @@ -28,7 +29,9 @@ import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.MultiStreamLease; +import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.leases.UpdateField; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -50,6 +53,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange; + /** * The top level orchestrator for coordinating the periodic shard sync related * activities. @@ -59,10 +64,13 @@ import java.util.stream.Collectors; @Slf4j class PeriodicShardSyncManager { private static final long INITIAL_DELAY = 60 * 1000L; - private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; - private static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; - private static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); - private static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; + private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; + @VisibleForTesting + static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; + @VisibleForTesting + static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); + @VisibleForTesting + static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; private Map hashRangeHoleTrackerMap = new HashMap<>(); private final String workerId; @@ -126,7 +134,7 @@ class PeriodicShardSyncManager { log.info("Syncing Kinesis shard info for " + streamIdentifier); final StreamConfig streamConfig = streamConfigEntry.getValue(); final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig); - final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask(); + final TaskResult taskResult = shardSyncTaskManager.callShardSyncTask(); if (taskResult.getException() != null) { throw taskResult.getException(); } @@ -145,19 +153,30 @@ class PeriodicShardSyncManager { private void runShardSync() { if (leaderDecider.isLeader(workerId)) { + log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId)); try { - final Map> streamToLeasesMap = getStreamToLeasesMap(currentStreamConfigMap.keySet()); - for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + // Construct the stream to leases map to be used in the lease sync + final Map> streamToLeasesMap = getStreamToLeasesMap( + currentStreamConfigMap.keySet()); - final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); - if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { - log.warn( - "Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", - shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + // For each of the stream, check if shard sync needs to be done based on the leases state. + for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + if (shouldDoShardSync(streamConfigEntry.getKey(), + streamToLeasesMap.get(streamConfigEntry.getKey()))) { + log.info("Periodic shard syncer initiating shard sync for {}", streamConfigEntry.getKey()); + final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider + .apply(streamConfigEntry.getValue()); + if (!shardSyncTaskManager.castShardSyncTask()) { + log.warn( + "Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + } + } else { + log.info("Skipping shard sync for {} as either hash ranges are complete in the lease table or leases hole confidence is not achieved.", streamConfigEntry.getKey()); } } } catch (Exception e) { - // TODO : Log + log.error("Caught exception while running periodic shard syncer.", e); } } else { log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); @@ -184,10 +203,12 @@ class PeriodicShardSyncManager { } } - // TODO : Catch exception - private boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List leases) { + @VisibleForTesting + boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List leases) { if (CollectionUtils.isNullOrEmpty(leases)) { - throw new IllegalArgumentException("No leases found to validate for the stream " + streamIdentifier); + // If the leases is null or empty then we need to do shard sync + log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier); + return true; } // Check if there are any holes in the leases and return the first hole if present. Optional hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); @@ -205,25 +226,76 @@ class PeriodicShardSyncManager { } private Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { - // Filter the hashranges of leases which has any checkpoint other than shard end. - List hashRangesForActiveLeases = leases.stream() - .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()) + // Filter the leases with any checkpoint other than shard end. + List activeLeases = leases.stream() + .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList()); + List activeLeasesWithHashRanges = fillWithHashRangesIfRequired(streamIdentifier, activeLeases); + List hashRangesForActiveLeases = activeLeasesWithHashRanges.stream() .map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList()); return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY); } + // If leases are missing hashranges information, update the leases in-memory as well as in the lease storage + // by learning from kinesis shards. + private List fillWithHashRangesIfRequired(StreamIdentifier streamIdentifier, List activeLeases) { + List activeLeasesWithNoHashRanges = activeLeases.stream() + .filter(lease -> lease.hashKeyRangeForLease() == null).collect(Collectors.toList()); + Optional minLeaseOpt = activeLeasesWithNoHashRanges.stream().min(Comparator.comparing(Lease::leaseKey)); + if (minLeaseOpt.isPresent()) { + // TODO : use minLease for new ListShards with startingShardId + final Lease minLease = minLeaseOpt.get(); + final ShardDetector shardDetector = shardSyncTaskManagerProvider + .apply(currentStreamConfigMap.get(streamIdentifier)).shardDetector(); + final Map kinesisShards = shardDetector.listShards().stream() + .collect(Collectors.toMap(Shard::shardId, shard -> shard)); + return activeLeases.stream().map(lease -> { + if (lease.hashKeyRangeForLease() == null) { + final String shardId = lease instanceof MultiStreamLease ? + ((MultiStreamLease) lease).shardId() : + lease.leaseKey(); + final Shard shard = kinesisShards.get(shardId); + if(shard == null) { + return lease; + } + lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); + try { + leaseRefresher.updateLease(lease, UpdateField.HASH_KEY_RANGE); + } catch (Exception e) { + log.warn( + "Unable to update hash range key information for lease {} of stream {}. This may result in explicit lease sync.", + lease.leaseKey(), streamIdentifier); + } + } + return lease; + }).filter(lease -> lease.hashKeyRangeForLease() != null).collect(Collectors.toList()); + } else { + return activeLeases; + } + } + @VisibleForTesting static Optional checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier, List hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) { + // Sort and merge the overlapping hash ranges. List mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); + if(mergedHashKeyRanges.isEmpty()) { + log.error("No valid hashranges found for stream {} between {} and {}.", streamIdentifier, + MIN_HASH_KEY, MAX_HASH_KEY); + return Optional.of(new HashRangeHole(new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY), + new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY))); + } + + // Validate for hashranges bounds. if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges .get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) { - log.error("Incomplete hash range found between {} and {}.", mergedHashKeyRanges.get(0), + log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier, + mergedHashKeyRanges.get(0), mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)); return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0), mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1))); } + // Check for any holes in the sorted hashrange intervals. if (mergedHashKeyRanges.size() > 1) { for (int i = 1; i < mergedHashKeyRanges.size(); i++) { final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1); @@ -232,8 +304,8 @@ class PeriodicShardSyncManager { final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey(); if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { - log.error("Incomplete hash range found between {} and {}.", hashRangeAtStartOfPossibleHole, - hashRangeAtEndOfPossibleHole); + log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, + hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole); return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole)); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index a2f9a45e..3b1003ee 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -351,7 +351,7 @@ public class Scheduler implements Runnable { log.info("LeaseCoordinator is already running. No need to start it."); } log.info("Scheduling periodicShardSync"); - // leaderElectedPeriodicShardSyncManager.start(shardSyncTasks); + leaderElectedPeriodicShardSyncManager.start(); // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged // TODO: Determine if waitUntilHashRangeCovered() is needed. streamSyncWatch.start(); @@ -417,7 +417,7 @@ public class Scheduler implements Runnable { final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); final StreamConfig streamConfig = currentStreamConfigMap .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); - if (createOrGetShardSyncTaskManager(streamConfig).syncShardAndLeaseInfo()) { + if (createOrGetShardSyncTaskManager(streamConfig).castShardSyncTask()) { log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", streamIdentifier.serialize(), completedShard.toString()); } @@ -480,7 +480,7 @@ public class Scheduler implements Runnable { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.syncShardAndLeaseInfo(); + shardSyncTaskManager.castShardSyncTask(); currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); streamsSynced.add(streamIdentifier); } else { @@ -508,7 +508,7 @@ public class Scheduler implements Runnable { + ". Syncing shards of that stream."); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( currentStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.syncShardAndLeaseInfo(); + shardSyncTaskManager.castShardSyncTask(); currentSetOfStreamsIter.remove(); streamsSynced.add(streamIdentifier); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 0d563a63..4ba0cf86 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -191,6 +191,21 @@ public interface LeaseRefresher { boolean updateLease(Lease lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing + * library such as leaseCounter, leaseOwner, or leaseKey. + * + * @return true if update succeeded, false otherwise + * + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity + * @throws DependencyException if DynamoDB update fails in an unexpected way + */ + default void updateLease(Lease lease, UpdateField updateField) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + throw new UnsupportedOperationException("updateLeaseWithNoExpectation is not implemented"); + } + /** * Check (synchronously) if there are any leases in the lease table. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java index 95b98399..5dbf6366 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java @@ -107,6 +107,15 @@ public interface LeaseSerializer { */ Map getDynamoUpdateLeaseUpdate(Lease lease); + /** + * @param lease + * @param updateField + * @return the attribute value map that updates application-specific data for a lease + */ + default Map getDynamoUpdateLeaseUpdate(Lease lease, UpdateField updateField) { + throw new UnsupportedOperationException(); + } + /** * @return the key schema for creating a DynamoDB table to store leases */ @@ -116,4 +125,5 @@ public interface LeaseSerializer { * @return attribute definitions for creating a DynamoDB table to store leases */ Collection getAttributeDefinitions(); + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index a52ac650..de3d4c6c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -126,7 +126,11 @@ public class ShardSyncTaskManager { this.lock = new ReentrantLock(); } - public TaskResult executeShardSyncTask() { + /** + * Call a ShardSyncTask and return the Task Result. + * @return the Task Result. + */ + public TaskResult callShardSyncTask() { final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPositionInStream, @@ -140,7 +144,11 @@ public class ShardSyncTaskManager { return metricCollectingTask.call(); } - public boolean syncShardAndLeaseInfo() { + /** + * Cast a ShardSyncTask and return if the casting is successful. + * @return if the casting is successful. + */ + public boolean castShardSyncTask() { try { lock.lock(); return checkAndSubmitNextTask(); @@ -197,7 +205,7 @@ public class ShardSyncTaskManager { log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); } // Acquire lock here. If shardSyncRequestPending is false in this completionStage and - // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) + // castShardSyncTask is invoked, before completion stage exits (future completes) // but right after the value of shardSyncRequestPending is checked, it will result in // shardSyncRequestPending being set to true, but no pending futures to trigger the next // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java new file mode 100644 index 00000000..c15449ca --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java @@ -0,0 +1,19 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.leases; + +public enum UpdateField { + CHILD_SHARDS, HASH_KEY_RANGE +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 67e5abbe..867cc507 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -35,6 +35,7 @@ import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseSerializer; +import software.amazon.kinesis.leases.UpdateField; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -659,6 +660,27 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return true; } + @Override + public void updateLease(Lease lease, UpdateField updateField) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + log.debug("Updating lease without expectation {}", lease); + final AWSExceptionManager exceptionManager = createExceptionManager(); + Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); + UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease)) + .attributeUpdates(updates).build(); + try { + try { + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); + } catch (ExecutionException e) { + throw exceptionManager.apply(e.getCause()); + } catch (InterruptedException e) { + throw new DependencyException(e); + } + } catch (DynamoDbException | TimeoutException e) { + throw convertAndRethrowExceptions("update", lease.leaseKey(), e); + } + } + /** * {@inheritDoc} */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 8f293881..4523bada 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -36,6 +36,7 @@ import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseSerializer; +import software.amazon.kinesis.leases.UpdateField; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -268,6 +269,28 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { return result; } + @Override + public Map getDynamoUpdateLeaseUpdate(Lease lease, + UpdateField updateField) { + Map result = new HashMap<>(); + switch (updateField) { + case CHILD_SHARDS: + if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) { + result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); + } + break; + case HASH_KEY_RANGE: + if (lease.hashKeyRangeForLease() != null) { + result.put(STARTING_HASH_KEY, putUpdate( + DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()))); + result.put(ENDING_HASH_KEY, putUpdate( + DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()))); + } + break; + } + return result; + } + @Override public Collection getKeySchema() { List keySchema = new ArrayList<>(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 3449b723..2544eac6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -115,7 +115,7 @@ public class ShutdownTask implements ConsumerTask { // This scenario could happen when customer deletes the stream while leaving the KCL application running. if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); - updateLeasesForChildShards(); + updateLeasesWithChildShards(); } else { log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo)); } @@ -189,7 +189,7 @@ public class ShutdownTask implements ConsumerTask { } } - private void updateLeasesForChildShards() + private void updateLeasesWithChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException { final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); Set childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index 2567a00a..eb88c0dc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -19,22 +19,56 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.kinesis.model.HashKeyRange; +import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.common.HashKeyRangeForLease; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.MultiStreamLease; +import software.amazon.kinesis.leases.ShardDetector; +import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize; +import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; +import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MAX_HASH_KEY; +import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MIN_HASH_KEY; @RunWith(MockitoJUnitRunner.class) public class PeriodicShardSyncManagerTest { + private StreamIdentifier streamIdentifier; + private PeriodicShardSyncManager periodicShardSyncManager; + @Mock + private LeaderDecider leaderDecider; + @Mock + private LeaseRefresher leaseRefresher; + @Mock + Map currentStreamConfigMap; + @Mock + Function shardSyncTaskManagerProvider; + @Before public void setup() { - + streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456"); + periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap, + shardSyncTaskManagerProvider, true); } @Test @@ -136,7 +170,7 @@ public class PeriodicShardSyncManagerTest { add(deserialize("25", "30")); // Missing interval here }}; Assert.assertTrue(PeriodicShardSyncManager - .checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); } @Test @@ -149,7 +183,263 @@ public class PeriodicShardSyncManagerTest { add(deserialize("24", "30")); }}; Assert.assertFalse(PeriodicShardSyncManager - .checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + } + + @Test + public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() { + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, null)); + } + + @Test + public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() { + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, new ArrayList<>())); + } + + @Test + public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsNotReached() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + } + + @Test + public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsReached() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + } + + @Test + public void testIfShardSyncIsInitiatedWhenHoleIsDueToShardEnd() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); // introducing hole here through SHARD_END checkpoint + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + if(lease.hashKeyRangeForLease().startingHashKey().toString().equals("4")) { + lease.checkpoint(ExtendedSequenceNumber.SHARD_END); + } else { + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + } + + @Test + public void testIfShardSyncIsInitiatedWhenNoLeasesAreUsedDueToShardEnd() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.SHARD_END); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + } + + @Test + public void testIfShardSyncIsNotInitiatedWhenHoleShifts() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + List multiStreamLeases2 = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); // Hole between 3 and 5 + add(deserialize("5", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + // Resetting the holes + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2)); + } + + @Test + public void testIfShardSyncIsNotInitiatedWhenHoleShiftsMoreThanOnce() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + List multiStreamLeases2 = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); // Hole between 3 and 5 + add(deserialize("5", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + // Resetting the holes + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); + // Resetting the holes + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + } + + @Test + public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSync() { + ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class); + ShardDetector shardDetector = mock(ShardDetector.class); + when(shardSyncTaskManagerProvider.apply(any())).thenReturn(shardSyncTaskManager); + when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + + final int[] shardCounter = { 0 }; + List hashKeyRangeForLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "20")); + add(deserialize("21", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}; + + List kinesisShards = hashKeyRangeForLeases.stream() + .map(hashKeyRangeForLease -> Shard.builder().shardId("shard-" + (++shardCounter[0])).hashKeyRange( + HashKeyRange.builder().startingHashKey(hashKeyRangeForLease.serializedStartingHashKey()) + .endingHashKey(hashKeyRangeForLease.serializedEndingHashKey()).build()).build()) + .collect(Collectors.toList()); + + when(shardDetector.listShards()).thenReturn(kinesisShards); + + final int[] leaseCounter = { 0 }; + List multiStreamLeases = hashKeyRangeForLeases.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0]))); + lease.shardId("shard-"+(leaseCounter[0])); + // Setting the hashrange only for last two leases + if(leaseCounter[0] >= 3) { + lease.hashKeyRange(hashKeyRangeForLease); + } + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + // Assert that shard sync should never trigger + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + + // Assert that all the leases now has hashRanges set. + for(Lease lease : multiStreamLeases) { + Assert.assertNotNull(lease.hashKeyRangeForLease()); + } + } + + @Test + public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSyncInHoleScenario() { + ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class); + ShardDetector shardDetector = mock(ShardDetector.class); + when(shardSyncTaskManagerProvider.apply(any())).thenReturn(shardSyncTaskManager); + when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + + final int[] shardCounter = { 0 }; + List hashKeyRangeForLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("5", "20")); // Hole between 3 and 5 + add(deserialize("21", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}; + + List kinesisShards = hashKeyRangeForLeases.stream() + .map(hashKeyRangeForLease -> Shard.builder().shardId("shard-" + (++shardCounter[0])).hashKeyRange( + HashKeyRange.builder().startingHashKey(hashKeyRangeForLease.serializedStartingHashKey()) + .endingHashKey(hashKeyRangeForLease.serializedEndingHashKey()).build()).build()) + .collect(Collectors.toList()); + + when(shardDetector.listShards()).thenReturn(kinesisShards); + + final int[] leaseCounter = { 0 }; + List multiStreamLeases = hashKeyRangeForLeases.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0]))); + lease.shardId("shard-"+(leaseCounter[0])); + // Setting the hashrange only for last two leases + if(leaseCounter[0] >= 3) { + lease.hashKeyRange(hashKeyRangeForLease); + } + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + // Assert that shard sync should never trigger + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + + // Assert that all the leases now has hashRanges set. + for(Lease lease : multiStreamLeases) { + Assert.assertNotNull(lease.hashKeyRangeForLease()); + } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index a5fd2add..e5a76ce3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -190,7 +190,7 @@ public class SchedulerTest { }); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); - when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); + when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); @@ -1036,7 +1036,7 @@ public class SchedulerTest { shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); - when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); + when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); if(shardSyncFirstAttemptFailure) { when(shardDetector.listShards()) .thenThrow(new RuntimeException("Service Exception")) From 02ea8cd70ffe87b4dbbb8e12ec58bc0c2b55a33e Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 7 May 2020 01:36:51 -0700 Subject: [PATCH 07/11] Added TODO for childshards update fix --- .../java/software/amazon/kinesis/lifecycle/ShutdownTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 2544eac6..9e295616 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -196,6 +196,7 @@ public class ShutdownTask implements ConsumerTask { final Lease updatedLease = currentLease.copy(); updatedLease.childShardIds(childShardIds); + // TODO : the following update will silently fail. Make changes to use the new leaserefresher#updateLease(Lease lease, UpdateField updateField) leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo)); log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds); } From fc4781e3475303360fead56a1bc3b9372081a160 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 21 May 2020 00:57:24 -0700 Subject: [PATCH 08/11] Addressed review comments --- .../coordinator/PeriodicShardSyncManager.java | 165 ++++++++++-------- .../amazon/kinesis/coordinator/Scheduler.java | 6 +- .../amazon/kinesis/leases/LeaseRefresher.java | 2 +- .../kinesis/leases/ShardSyncTaskManager.java | 6 +- .../amazon/kinesis/leases/UpdateField.java | 7 + .../dynamodb/DynamoDBLeaseRefresher.java | 2 +- .../PeriodicShardSyncManagerTest.java | 156 ++++------------- 7 files changed, 146 insertions(+), 198 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 3979236b..a96bf01e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -19,6 +19,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; import lombok.Value; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; import software.amazon.awssdk.services.kinesis.model.Shard; @@ -161,25 +162,28 @@ class PeriodicShardSyncManager { // For each of the stream, check if shard sync needs to be done based on the leases state. for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { - if (shouldDoShardSync(streamConfigEntry.getKey(), - streamToLeasesMap.get(streamConfigEntry.getKey()))) { - log.info("Periodic shard syncer initiating shard sync for {}", streamConfigEntry.getKey()); + final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), + streamToLeasesMap.get(streamConfigEntry.getKey())); + if (shardSyncResponse.shouldDoShardSync()) { + log.info("Periodic shard syncer initiating shard sync for {} due to the reason - ", + streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision()); final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider .apply(streamConfigEntry.getValue()); - if (!shardSyncTaskManager.castShardSyncTask()) { + if (!shardSyncTaskManager.submitShardSyncTask()) { log.warn( - "Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", + "Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.", shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); } } else { - log.info("Skipping shard sync for {} as either hash ranges are complete in the lease table or leases hole confidence is not achieved.", streamConfigEntry.getKey()); + log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(), + shardSyncResponse.reasonForDecision()); } } } catch (Exception e) { log.error("Caught exception while running periodic shard syncer.", e); } } else { - log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); + log.debug("WorkerId {} is not a leader, not running the shard sync task", workerId); } } @@ -204,35 +208,49 @@ class PeriodicShardSyncManager { } @VisibleForTesting - boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List leases) { + ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List leases) { if (CollectionUtils.isNullOrEmpty(leases)) { // If the leases is null or empty then we need to do shard sync log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier); - return true; + return new ShardSyncResponse(true, "No leases found for " + streamIdentifier); } // Check if there are any holes in the leases and return the first hole if present. Optional hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); if (hashRangeHoleOpt.isPresent()) { // If hole is present, check if the hole is detected consecutively in previous occurrences. // If hole is determined with high confidence return true; return false otherwise - return hashRangeHoleTrackerMap.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker()) + // We are using the high confidence factor to avoid shard sync on any holes during resharding and + // lease cleanups or any intermittent issues. + final HashRangeHoleTracker hashRangeHoleTracker = hashRangeHoleTrackerMap + .computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker()); + final boolean hasHoleWithHighConfidence = hashRangeHoleTracker .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); + return new ShardSyncResponse(hasHoleWithHighConfidence, + "Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + + " times. Shard sync will be initiated when threshold breaches " + + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY); } else { // If hole is not present, clear any previous tracking for this stream and return false; hashRangeHoleTrackerMap.remove(streamIdentifier); - return false; + return new ShardSyncResponse(false, "Hash Ranges are complete for " + streamIdentifier); } } + @Value + @Accessors(fluent = true) + @VisibleForTesting + static class ShardSyncResponse { + private final boolean shouldDoShardSync; + private final String reasonForDecision; + } + private Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { // Filter the leases with any checkpoint other than shard end. List activeLeases = leases.stream() .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList()); List activeLeasesWithHashRanges = fillWithHashRangesIfRequired(streamIdentifier, activeLeases); - List hashRangesForActiveLeases = activeLeasesWithHashRanges.stream() - .map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList()); - return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY); + return checkForHoleInHashKeyRanges(streamIdentifier, activeLeasesWithHashRanges); } // If leases are missing hashranges information, update the leases in-memory as well as in the lease storage @@ -259,7 +277,7 @@ class PeriodicShardSyncManager { } lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); try { - leaseRefresher.updateLease(lease, UpdateField.HASH_KEY_RANGE); + leaseRefresher.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); } catch (Exception e) { log.warn( "Unable to update hash range key information for lease {} of stream {}. This may result in explicit lease sync.", @@ -275,38 +293,46 @@ class PeriodicShardSyncManager { @VisibleForTesting static Optional checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier, - List hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) { - // Sort and merge the overlapping hash ranges. - List mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); - - if(mergedHashKeyRanges.isEmpty()) { - log.error("No valid hashranges found for stream {} between {} and {}.", streamIdentifier, - MIN_HASH_KEY, MAX_HASH_KEY); - return Optional.of(new HashRangeHole(new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY), - new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY))); + List leasesWithHashKeyRanges) { + // Sort the hash ranges by starting hash key. + List sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges); + if(sortedLeasesWithHashKeyRanges.isEmpty()) { + log.error("No leases with valid hashranges found for stream {}", streamIdentifier); + return Optional.of(new HashRangeHole()); } - // Validate for hashranges bounds. - if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges - .get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) { + if (!sortedLeasesWithHashKeyRanges.get(0).hashKeyRangeForLease().startingHashKey().equals(MIN_HASH_KEY) || !sortedLeasesWithHashKeyRanges + .get(sortedLeasesWithHashKeyRanges.size() - 1).hashKeyRangeForLease().endingHashKey().equals(MAX_HASH_KEY)) { log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier, - mergedHashKeyRanges.get(0), - mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)); - return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0), - mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1))); + sortedLeasesWithHashKeyRanges.get(0), + sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1)); + return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0), + sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1))); } // Check for any holes in the sorted hashrange intervals. - if (mergedHashKeyRanges.size() > 1) { - for (int i = 1; i < mergedHashKeyRanges.size(); i++) { - final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1); - final HashKeyRangeForLease hashRangeAtEndOfPossibleHole = mergedHashKeyRanges.get(i); - final BigInteger startOfPossibleHole = hashRangeAtStartOfPossibleHole.endingHashKey(); - final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey(); - - if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { - log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, - hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole); - return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole)); + if (sortedLeasesWithHashKeyRanges.size() > 1) { + Lease leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(0); + HashKeyRangeForLease leftLeaseHashRange = leftMostLeaseToReportInCaseOfHole.hashKeyRangeForLease(); + for (int i = 1; i < sortedLeasesWithHashKeyRanges.size(); i++) { + final HashKeyRangeForLease rightLeaseHashRange = sortedLeasesWithHashKeyRanges.get(i).hashKeyRangeForLease(); + final BigInteger rangeDiff = rightLeaseHashRange.startingHashKey().subtract(leftLeaseHashRange.endingHashKey()); + // Case of overlapping leases when the rangediff is 0 or negative. + // signum() will be -1 for negative and 0 if value is 0. + // Merge the range for further tracking. + if (rangeDiff.signum() <= 0) { + leftLeaseHashRange = new HashKeyRangeForLease(leftLeaseHashRange.startingHashKey(), + leftLeaseHashRange.endingHashKey().max(rightLeaseHashRange.endingHashKey())); + } else { + // Case of non overlapping leases when rangediff is positive. signum() will be 1 for positive. + // If rangeDiff is 1, then it is a case of continuous hashrange. If not, it is a hole. + if (!rangeDiff.equals(BigInteger.ONE)) { + log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, + leftMostLeaseToReportInCaseOfHole, sortedLeasesWithHashKeyRanges.get(i)); + return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole, + sortedLeasesWithHashKeyRanges.get(i))); + } + leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(i); + leftLeaseHashRange = rightLeaseHashRange; } } } @@ -314,41 +340,31 @@ class PeriodicShardSyncManager { } @VisibleForTesting - static List sortAndMergeOverlappingHashRanges( - List hashKeyRanges) { - if (hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1) - return hashKeyRanges; - - Collections.sort(hashKeyRanges, new HashKeyRangeComparator()); - - final HashKeyRangeForLease first = hashKeyRanges.get(0); - BigInteger start = first.startingHashKey(); - BigInteger end = first.endingHashKey(); - - final List result = new ArrayList<>(); - - for (int i = 1; i < hashKeyRanges.size(); i++) { - HashKeyRangeForLease current = hashKeyRanges.get(i); - if (current.startingHashKey().compareTo(end) <= 0) { - end = current.endingHashKey().max(end); - } else { - result.add(new HashKeyRangeForLease(start, end)); - start = current.startingHashKey(); - end = current.endingHashKey(); - } - } - result.add(new HashKeyRangeForLease(start, end)); - return result; + static List sortLeasesByHashRange(List leasesWithHashKeyRanges) { + if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) + return leasesWithHashKeyRanges; + Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator()); + return leasesWithHashKeyRanges; } @Value private static class HashRangeHole { - private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; - private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; + HashRangeHole() { + leaseAtEndOfPossibleHole = leaseAtStartOfPossibleHole = null; + } + + HashRangeHole(Lease leaseAtStartOfPossibleHole, Lease leaseAtEndOfPossibleHole) { + this.leaseAtStartOfPossibleHole = leaseAtStartOfPossibleHole; + this.leaseAtEndOfPossibleHole = leaseAtEndOfPossibleHole; + } + + private final Lease leaseAtStartOfPossibleHole; + private final Lease leaseAtEndOfPossibleHole; } private static class HashRangeHoleTracker { private HashRangeHole hashRangeHole; + @Getter private Integer numConsecutiveHoles; public boolean hasHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) { @@ -365,12 +381,17 @@ class PeriodicShardSyncManager { /** * Helper class to compare leases based on their hash range. */ - private static class HashKeyRangeComparator implements Comparator, Serializable { + private static class HashKeyRangeComparator implements Comparator, Serializable { private static final long serialVersionUID = 1L; - @Override public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) { - return hashKeyRange.startingHashKey().compareTo(otherHashKeyRange.startingHashKey()); + @Override public int compare(Lease lease, Lease otherLease) { + Validate.notNull(lease); + Validate.notNull(otherLease); + Validate.notNull(lease.hashKeyRangeForLease()); + Validate.notNull(otherLease.hashKeyRangeForLease()); + return lease.hashKeyRangeForLease().startingHashKey() + .compareTo(otherLease.hashKeyRangeForLease().startingHashKey()); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 3b1003ee..91125a06 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -417,7 +417,7 @@ public class Scheduler implements Runnable { final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); final StreamConfig streamConfig = currentStreamConfigMap .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); - if (createOrGetShardSyncTaskManager(streamConfig).castShardSyncTask()) { + if (createOrGetShardSyncTaskManager(streamConfig).submitShardSyncTask()) { log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", streamIdentifier.serialize(), completedShard.toString()); } @@ -480,7 +480,7 @@ public class Scheduler implements Runnable { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.castShardSyncTask(); + shardSyncTaskManager.submitShardSyncTask(); currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); streamsSynced.add(streamIdentifier); } else { @@ -508,7 +508,7 @@ public class Scheduler implements Runnable { + ". Syncing shards of that stream."); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( currentStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.castShardSyncTask(); + shardSyncTaskManager.submitShardSyncTask(); currentSetOfStreamsIter.remove(); streamsSynced.add(streamIdentifier); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 4ba0cf86..b7f38a4e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -201,7 +201,7 @@ public interface LeaseRefresher { * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity * @throws DependencyException if DynamoDB update fails in an unexpected way */ - default void updateLease(Lease lease, UpdateField updateField) + default void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField) throws DependencyException, InvalidStateException, ProvisionedThroughputException { throw new UnsupportedOperationException("updateLeaseWithNoExpectation is not implemented"); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index de3d4c6c..6a1ceff4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -145,10 +145,10 @@ public class ShardSyncTaskManager { } /** - * Cast a ShardSyncTask and return if the casting is successful. + * Submit a ShardSyncTask and return if the submission is successful. * @return if the casting is successful. */ - public boolean castShardSyncTask() { + public boolean submitShardSyncTask() { try { lock.lock(); return checkAndSubmitNextTask(); @@ -205,7 +205,7 @@ public class ShardSyncTaskManager { log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); } // Acquire lock here. If shardSyncRequestPending is false in this completionStage and - // castShardSyncTask is invoked, before completion stage exits (future completes) + // submitShardSyncTask is invoked, before completion stage exits (future completes) // but right after the value of shardSyncRequestPending is checked, it will result in // shardSyncRequestPending being set to true, but no pending futures to trigger the next // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java index c15449ca..9461a18e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java @@ -14,6 +14,13 @@ */ package software.amazon.kinesis.leases; +/** + * These are the special fields that will be updated only once during the lifetime of the lease. + * Since these are meta information that will not affect lease ownership or data durability, we allow + * any elected leader or worker to set these fields directly without any conditional checks. + * Note that though HASH_KEY_RANGE will be available during lease initialization in newer versions, we keep this + * for backfilling while rolling forward to newer versions. + */ public enum UpdateField { CHILD_SHARDS, HASH_KEY_RANGE } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 867cc507..30201236 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -661,7 +661,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } @Override - public void updateLease(Lease lease, UpdateField updateField) + public void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField) throws DependencyException, InvalidStateException, ProvisionedThroughputException { log.debug("Updating lease without expectation {}", lease); final AWSExceptionManager exceptionManager = createExceptionManager(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index eb88c0dc..9577e7a8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -33,7 +33,6 @@ import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -71,129 +70,50 @@ public class PeriodicShardSyncManagerTest { shardSyncTaskManagerProvider, true); } - @Test - public void testIfHashRangesAreNotMergedWhenNoOverlappingIntervalsGiven() { - List hashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRanges); - Assert.assertEquals(hashRanges, sortAndMergedHashRanges); - } - - @Test - public void testIfHashRangesAreSortedWhenNoOverlappingIntervalsGiven() { - List hashRanges = new ArrayList() {{ - add(deserialize("2", "3")); - add(deserialize("0", "1")); - add(deserialize("24", "30")); - add(deserialize("4", "23")); - }}; - List hashRangesCopy = new ArrayList<>(); - hashRangesCopy.addAll(hashRanges); - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRangesCopy); - Assert.assertEquals(hashRangesCopy, sortAndMergedHashRanges); - Assert.assertNotEquals(hashRanges, sortAndMergedHashRanges); - } - - @Test - public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase1() { - List hashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("6", "23")); - add(deserialize("24", "30")); - }}; - List expectedHashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRanges); - Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); - } - - @Test - public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase2() { - List hashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "5")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List expectedHashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRanges); - Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); - } - - @Test - public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase3() { - List hashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("4", "5")); - add(deserialize("24", "30")); - }}; - List expectedHashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRanges); - Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); - } - @Test public void testForFailureWhenHashRangesAreIncomplete() { - List hashRanges = new ArrayList() {{ + List hashRanges = new ArrayList() {{ add(deserialize("0", "1")); add(deserialize("2", "3")); add(deserialize("4", "23")); add(deserialize("6", "23")); - add(deserialize("25", "30")); // Missing interval here - }}; + add(deserialize("25", MAX_HASH_KEY.toString())); // Missing interval here + }}.stream().map(hashKeyRangeForLease -> { + Lease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); Assert.assertTrue(PeriodicShardSyncManager - .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); } @Test public void testForSuccessWhenHashRangesAreComplete() { - List hashRanges = new ArrayList() {{ + List hashRanges = new ArrayList() {{ add(deserialize("0", "1")); add(deserialize("2", "3")); add(deserialize("4", "23")); add(deserialize("6", "23")); - add(deserialize("24", "30")); - }}; + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + Lease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); Assert.assertFalse(PeriodicShardSyncManager - .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); } @Test public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() { - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, null)); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, null).shouldDoShardSync()); } @Test public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() { - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, new ArrayList<>())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, new ArrayList<>()).shouldDoShardSync()); } @Test @@ -211,7 +131,7 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); } @Test @@ -229,8 +149,8 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @Test @@ -252,8 +172,8 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @Test @@ -271,8 +191,8 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @Test @@ -290,7 +210,7 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); List multiStreamLeases2 = new ArrayList() {{ add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize("2", "3")); // Hole between 3 and 5 @@ -305,8 +225,8 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); // Resetting the holes IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()); } @Test @@ -324,7 +244,7 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); List multiStreamLeases2 = new ArrayList() {{ add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize("2", "3")); // Hole between 3 and 5 @@ -339,11 +259,11 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); // Resetting the holes IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync())); // Resetting the holes IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @Test @@ -385,8 +305,8 @@ public class PeriodicShardSyncManagerTest { // Assert that shard sync should never trigger IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); // Assert that all the leases now has hashRanges set. for(Lease lease : multiStreamLeases) { @@ -433,8 +353,8 @@ public class PeriodicShardSyncManagerTest { // Assert that shard sync should never trigger IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); // Assert that all the leases now has hashRanges set. for(Lease lease : multiStreamLeases) { From 72a6d5e08496c2b56795039381ac609bab07bf85 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 25 May 2020 13:25:43 -0700 Subject: [PATCH 09/11] Adding more unit test cases and fixing an edge case --- .../kinesis/common/HashKeyRangeForLease.java | 7 + .../coordinator/PeriodicShardSyncManager.java | 13 +- .../amazon/kinesis/coordinator/Scheduler.java | 2 - .../PeriodicShardSyncManagerTest.java | 225 +++++++++++++++++- 4 files changed, 240 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java index 063451a0..d2540073 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java @@ -32,6 +32,13 @@ public class HashKeyRangeForLease { private final BigInteger startingHashKey; private final BigInteger endingHashKey; + public HashKeyRangeForLease(BigInteger startingHashKey, BigInteger endingHashKey) { + Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0, + "StartingHashKey %s must be less than EndingHashKey %s ", startingHashKey, endingHashKey); + this.startingHashKey = startingHashKey; + this.endingHashKey = endingHashKey; + } + /** * Serialize the startingHashKey for persisting in external storage * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index a96bf01e..5ac4647c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.coordinator; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ComparisonChain; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; @@ -245,7 +246,8 @@ class PeriodicShardSyncManager { private final String reasonForDecision; } - private Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { + @VisibleForTesting + Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { // Filter the leases with any checkpoint other than shard end. List activeLeases = leases.stream() .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList()); @@ -385,13 +387,16 @@ class PeriodicShardSyncManager { private static final long serialVersionUID = 1L; - @Override public int compare(Lease lease, Lease otherLease) { + @Override + public int compare(Lease lease, Lease otherLease) { Validate.notNull(lease); Validate.notNull(otherLease); Validate.notNull(lease.hashKeyRangeForLease()); Validate.notNull(otherLease.hashKeyRangeForLease()); - return lease.hashKeyRangeForLease().startingHashKey() - .compareTo(otherLease.hashKeyRangeForLease().startingHashKey()); + return ComparisonChain.start() + .compare(lease.hashKeyRangeForLease().startingHashKey(), otherLease.hashKeyRangeForLease().startingHashKey()) + .compare(lease.hashKeyRangeForLease().endingHashKey(), otherLease.hashKeyRangeForLease().endingHashKey()) + .result(); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 91125a06..e2f2f852 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -352,8 +352,6 @@ public class Scheduler implements Runnable { } log.info("Scheduling periodicShardSync"); leaderElectedPeriodicShardSyncManager.start(); - // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged - // TODO: Determine if waitUntilHashRangeCovered() is needed. streamSyncWatch.start(); isDone = true; } catch (LeasingException e) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index 9577e7a8..dfba2791 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.coordinator; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -23,6 +25,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; @@ -33,7 +36,9 @@ import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -106,6 +111,44 @@ public class PeriodicShardSyncManagerTest { .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); } + @Test + public void testForSuccessWhenUnSortedHashRangesAreComplete() { + List hashRanges = new ArrayList() {{ + add(deserialize("4", "23")); + add(deserialize("2", "3")); + add(deserialize("0", "1")); + add(deserialize("24", MAX_HASH_KEY.toString())); + add(deserialize("6", "23")); + + }}.stream().map(hashKeyRangeForLease -> { + Lease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + Assert.assertFalse(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); + } + + @Test + public void testForSuccessWhenHashRangesAreCompleteForOverlappingLeasesAtEnd() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + add(deserialize("24", "45")); + }}.stream().map(hashKeyRangeForLease -> { + Lease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + Assert.assertFalse(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); + } + @Test public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() { Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, null).shouldDoShardSync()); @@ -135,7 +178,7 @@ public class PeriodicShardSyncManagerTest { } @Test - public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsReached() { + public void testIfShardSyncIsInitiatedWhenConfidenceFactorIsReached() { List multiStreamLeases = new ArrayList() {{ add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize("2", "3")); @@ -362,4 +405,184 @@ public class PeriodicShardSyncManagerTest { } } + @Test + public void testFor1000DifferentValidSplitHierarchyTreeTheHashRangesAreAlwaysComplete() { + for(int i=0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.SPLIT, maxInitialLeaseCount, false); + Collections.shuffle(leases); +// System.out.println( +// leases.stream().map(l -> l.checkpoint().sequenceNumber() + ":" + l.hashKeyRangeForLease()).collect(Collectors.toList())); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidMergeHierarchyTreeTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, false); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidReshardHierarchyTreeTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, false); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidMergeHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, true); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidReshardHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, true); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + + + private List generateInitialLeases(int initialShardCount) { + long hashRangeInternalMax = 10000000; + List initialLeases = new ArrayList<>(); + long leaseStartKey = 0; + for (int i = 1; i <= initialShardCount; i++) { + final Lease lease = new Lease(); + long leaseEndKey; + if (i != initialShardCount) { + leaseEndKey = (hashRangeInternalMax / initialShardCount) * i; + lease.hashKeyRange(HashKeyRangeForLease.deserialize(leaseStartKey + "", leaseEndKey + "")); + } else { + leaseEndKey = 0; + lease.hashKeyRange(HashKeyRangeForLease.deserialize(leaseStartKey + "", MAX_HASH_KEY.toString())); + } + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + lease.leaseKey("shard-" + i); + initialLeases.add(lease); + leaseStartKey = leaseEndKey + 1; + } + return initialLeases; + } + + private void reshard(List initialLeases, int depth, ReshardType reshardType, int leaseCounter, + boolean shouldKeepSomeParentsInProgress) { + for (int i = 0; i < depth; i++) { + if (reshardType == ReshardType.SPLIT) { + leaseCounter = split(initialLeases, leaseCounter); + } else if (reshardType == ReshardType.MERGE) { + leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress); + } else { + if (isHeads()) { + leaseCounter = split(initialLeases, leaseCounter); + } else { + leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress); + } + } + } + } + + private int merge(List initialLeases, int leaseCounter, boolean shouldKeepSomeParentsInProgress) { + List leasesEligibleForMerge = initialLeases.stream().filter(l -> CollectionUtils.isNullOrEmpty(l.childShardIds())) + .collect(Collectors.toList()); +// System.out.println("Leases to merge : " + leasesEligibleForMerge); + int leasesToMerge = (int) ((leasesEligibleForMerge.size() - 1) / 2.0 * Math.random()); + for (int i = 0; i < leasesToMerge; i += 2) { + Lease parent1 = leasesEligibleForMerge.get(i); + Lease parent2 = leasesEligibleForMerge.get(i + 1); + if(parent2.hashKeyRangeForLease().startingHashKey().subtract(parent1.hashKeyRangeForLease().endingHashKey()).equals(BigInteger.ONE)) + { + parent1.checkpoint(ExtendedSequenceNumber.SHARD_END); + if (!shouldKeepSomeParentsInProgress || (shouldKeepSomeParentsInProgress && isOneFromDiceRoll())) { +// System.out.println("Deciding to keep parent in progress : " + parent2); + parent2.checkpoint(ExtendedSequenceNumber.SHARD_END); + } + Lease child = new Lease(); + child.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + child.leaseKey("shard-" + (++leaseCounter)); +// System.out.println("Parent " + parent1 + " and " + parent2 + " merges into " + child); + child.hashKeyRange(new HashKeyRangeForLease(parent1.hashKeyRangeForLease().startingHashKey(), + parent2.hashKeyRangeForLease().endingHashKey())); + parent1.childShardIds(Collections.singletonList(child.leaseKey())); + parent2.childShardIds(Collections.singletonList(child.leaseKey())); + child.parentShardIds(Sets.newHashSet(parent1.leaseKey(), parent2.leaseKey())); + + initialLeases.add(child); + } + } + return leaseCounter; + } + + private int split(List initialLeases, int leaseCounter) { + List leasesEligibleForSplit = initialLeases.stream().filter(l -> CollectionUtils.isNullOrEmpty(l.childShardIds())) + .collect(Collectors.toList()); +// System.out.println("Leases to split : " + leasesEligibleForSplit); + int leasesToSplit = (int) (leasesEligibleForSplit.size() * Math.random()); + for (int i = 0; i < leasesToSplit; i++) { + Lease parent = leasesEligibleForSplit.get(i); + parent.checkpoint(ExtendedSequenceNumber.SHARD_END); + Lease child1 = new Lease(); + child1.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + child1.hashKeyRange(new HashKeyRangeForLease(parent.hashKeyRangeForLease().startingHashKey(), + parent.hashKeyRangeForLease().startingHashKey().add(parent.hashKeyRangeForLease().endingHashKey()) + .divide(new BigInteger("2")))); + child1.leaseKey("shard-" + (++leaseCounter)); + Lease child2 = new Lease(); + child2.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + child2.hashKeyRange(new HashKeyRangeForLease( + parent.hashKeyRangeForLease().startingHashKey().add(parent.hashKeyRangeForLease().endingHashKey()) + .divide(new BigInteger("2")).add(new BigInteger("1")), + parent.hashKeyRangeForLease().endingHashKey())); + child2.leaseKey("shard-" + (++leaseCounter)); + + child1.parentShardIds(Sets.newHashSet(parent.leaseKey())); + child2.parentShardIds(Sets.newHashSet(parent.leaseKey())); + parent.childShardIds(Lists.newArrayList(child1.leaseKey(), child2.leaseKey())); + +// System.out.println("Parent " + parent + " splits into " + child1 + " and " + child2); + + initialLeases.add(child1); + initialLeases.add(child2); + } + return leaseCounter; + } + + private boolean isHeads() { + return Math.random() <= 0.5; + } + + private boolean isOneFromDiceRoll() { + return Math.random() <= 0.16; + } + + + private enum ReshardType { + SPLIT, + MERGE, + ANY + } + + + } From 08ca1b61bc596858c0596ec7644c5c3ef482564e Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 25 May 2020 13:37:10 -0700 Subject: [PATCH 10/11] Changing test case name --- .../amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java index a9e9c689..06a72230 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java @@ -75,7 +75,7 @@ public class BlockOnParentShardTaskTest { * @throws DependencyException */ @Test - public final void testCallWhenParentsHaveFinished() + public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinished() throws DependencyException, InvalidStateException, ProvisionedThroughputException { ShardInfo shardInfo = null; @@ -116,7 +116,7 @@ public class BlockOnParentShardTaskTest { * @throws DependencyException */ @Test - public final void testCallWhenParentsHaveFinishedMultiStream() + public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMultiStream() throws DependencyException, InvalidStateException, ProvisionedThroughputException { ShardInfo shardInfo = null; From 67d2b082fd7580e01cb302cf2d73bc3441f19809 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 26 May 2020 11:10:10 -0700 Subject: [PATCH 11/11] Fixing retry logic --- .../coordinator/PeriodicShardSyncManager.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 5ac4647c..c84547e2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -47,6 +47,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; @@ -166,7 +167,7 @@ class PeriodicShardSyncManager { final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), streamToLeasesMap.get(streamConfigEntry.getKey())); if (shardSyncResponse.shouldDoShardSync()) { - log.info("Periodic shard syncer initiating shard sync for {} due to the reason - ", + log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ", streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision()); final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider .apply(streamConfigEntry.getValue()); @@ -228,7 +229,7 @@ class PeriodicShardSyncManager { .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); return new ShardSyncResponse(hasHoleWithHighConfidence, "Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() - + " times. Shard sync will be initiated when threshold breaches " + + " times. Shard sync will be initiated when threshold reaches " + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY); } else { @@ -308,8 +309,8 @@ class PeriodicShardSyncManager { log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier, sortedLeasesWithHashKeyRanges.get(0), sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1)); - return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0), - sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1))); + return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0).hashKeyRangeForLease(), + sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1).hashKeyRangeForLease())); } // Check for any holes in the sorted hashrange intervals. if (sortedLeasesWithHashKeyRanges.size() > 1) { @@ -330,8 +331,8 @@ class PeriodicShardSyncManager { if (!rangeDiff.equals(BigInteger.ONE)) { log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, leftMostLeaseToReportInCaseOfHole, sortedLeasesWithHashKeyRanges.get(i)); - return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole, - sortedLeasesWithHashKeyRanges.get(i))); + return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole.hashKeyRangeForLease(), + sortedLeasesWithHashKeyRanges.get(i).hashKeyRangeForLease())); } leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(i); leftLeaseHashRange = rightLeaseHashRange; @@ -352,16 +353,16 @@ class PeriodicShardSyncManager { @Value private static class HashRangeHole { HashRangeHole() { - leaseAtEndOfPossibleHole = leaseAtStartOfPossibleHole = null; + hashRangeAtStartOfPossibleHole = hashRangeAtEndOfPossibleHole = null; } - HashRangeHole(Lease leaseAtStartOfPossibleHole, Lease leaseAtEndOfPossibleHole) { - this.leaseAtStartOfPossibleHole = leaseAtStartOfPossibleHole; - this.leaseAtEndOfPossibleHole = leaseAtEndOfPossibleHole; + HashRangeHole(HashKeyRangeForLease hashRangeAtStartOfPossibleHole, HashKeyRangeForLease hashRangeAtEndOfPossibleHole) { + this.hashRangeAtStartOfPossibleHole = hashRangeAtStartOfPossibleHole; + this.hashRangeAtEndOfPossibleHole = hashRangeAtEndOfPossibleHole; } - private final Lease leaseAtStartOfPossibleHole; - private final Lease leaseAtEndOfPossibleHole; + private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; + private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; } private static class HashRangeHoleTracker {