From 9115f2000b383aeb2fcb372419c53cefeac0fdb0 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 7 May 2020 01:28:56 -0700 Subject: [PATCH] Logic to auto fill the missing hashranges and lease recovery. Added more unit test cases --- .../coordinator/PeriodicShardSyncManager.java | 116 +++++-- .../amazon/kinesis/coordinator/Scheduler.java | 8 +- .../amazon/kinesis/leases/LeaseRefresher.java | 15 + .../kinesis/leases/LeaseSerializer.java | 10 + .../kinesis/leases/ShardSyncTaskManager.java | 14 +- .../amazon/kinesis/leases/UpdateField.java | 19 ++ .../dynamodb/DynamoDBLeaseRefresher.java | 22 ++ .../dynamodb/DynamoDBLeaseSerializer.java | 23 ++ .../kinesis/lifecycle/ShutdownTask.java | 4 +- .../PeriodicShardSyncManagerTest.java | 296 +++++++++++++++++- .../kinesis/coordinator/SchedulerTest.java | 4 +- 11 files changed, 495 insertions(+), 36 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 90c95cf3..3979236b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -21,6 +21,7 @@ import lombok.NonNull; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.StreamConfig; @@ -28,7 +29,9 @@ import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.MultiStreamLease; +import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.leases.UpdateField; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -50,6 +53,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange; + /** * The top level orchestrator for coordinating the periodic shard sync related * activities. @@ -59,10 +64,13 @@ import java.util.stream.Collectors; @Slf4j class PeriodicShardSyncManager { private static final long INITIAL_DELAY = 60 * 1000L; - private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; - private static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; - private static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); - private static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; + private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; + @VisibleForTesting + static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; + @VisibleForTesting + static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); + @VisibleForTesting + static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; private Map hashRangeHoleTrackerMap = new HashMap<>(); private final String workerId; @@ -126,7 +134,7 @@ class PeriodicShardSyncManager { log.info("Syncing Kinesis shard info for " + streamIdentifier); final StreamConfig streamConfig = streamConfigEntry.getValue(); final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig); - final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask(); + final TaskResult taskResult = shardSyncTaskManager.callShardSyncTask(); if (taskResult.getException() != null) { throw taskResult.getException(); } @@ -145,19 +153,30 @@ class PeriodicShardSyncManager { private void runShardSync() { if (leaderDecider.isLeader(workerId)) { + log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId)); try { - final Map> streamToLeasesMap = getStreamToLeasesMap(currentStreamConfigMap.keySet()); - for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + // Construct the stream to leases map to be used in the lease sync + final Map> streamToLeasesMap = getStreamToLeasesMap( + currentStreamConfigMap.keySet()); - final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); - if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { - log.warn( - "Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", - shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + // For each of the stream, check if shard sync needs to be done based on the leases state. + for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + if (shouldDoShardSync(streamConfigEntry.getKey(), + streamToLeasesMap.get(streamConfigEntry.getKey()))) { + log.info("Periodic shard syncer initiating shard sync for {}", streamConfigEntry.getKey()); + final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider + .apply(streamConfigEntry.getValue()); + if (!shardSyncTaskManager.castShardSyncTask()) { + log.warn( + "Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + } + } else { + log.info("Skipping shard sync for {} as either hash ranges are complete in the lease table or leases hole confidence is not achieved.", streamConfigEntry.getKey()); } } } catch (Exception e) { - // TODO : Log + log.error("Caught exception while running periodic shard syncer.", e); } } else { log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); @@ -184,10 +203,12 @@ class PeriodicShardSyncManager { } } - // TODO : Catch exception - private boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List leases) { + @VisibleForTesting + boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List leases) { if (CollectionUtils.isNullOrEmpty(leases)) { - throw new IllegalArgumentException("No leases found to validate for the stream " + streamIdentifier); + // If the leases is null or empty then we need to do shard sync + log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier); + return true; } // Check if there are any holes in the leases and return the first hole if present. Optional hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); @@ -205,25 +226,76 @@ class PeriodicShardSyncManager { } private Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { - // Filter the hashranges of leases which has any checkpoint other than shard end. - List hashRangesForActiveLeases = leases.stream() - .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()) + // Filter the leases with any checkpoint other than shard end. + List activeLeases = leases.stream() + .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList()); + List activeLeasesWithHashRanges = fillWithHashRangesIfRequired(streamIdentifier, activeLeases); + List hashRangesForActiveLeases = activeLeasesWithHashRanges.stream() .map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList()); return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY); } + // If leases are missing hashranges information, update the leases in-memory as well as in the lease storage + // by learning from kinesis shards. + private List fillWithHashRangesIfRequired(StreamIdentifier streamIdentifier, List activeLeases) { + List activeLeasesWithNoHashRanges = activeLeases.stream() + .filter(lease -> lease.hashKeyRangeForLease() == null).collect(Collectors.toList()); + Optional minLeaseOpt = activeLeasesWithNoHashRanges.stream().min(Comparator.comparing(Lease::leaseKey)); + if (minLeaseOpt.isPresent()) { + // TODO : use minLease for new ListShards with startingShardId + final Lease minLease = minLeaseOpt.get(); + final ShardDetector shardDetector = shardSyncTaskManagerProvider + .apply(currentStreamConfigMap.get(streamIdentifier)).shardDetector(); + final Map kinesisShards = shardDetector.listShards().stream() + .collect(Collectors.toMap(Shard::shardId, shard -> shard)); + return activeLeases.stream().map(lease -> { + if (lease.hashKeyRangeForLease() == null) { + final String shardId = lease instanceof MultiStreamLease ? + ((MultiStreamLease) lease).shardId() : + lease.leaseKey(); + final Shard shard = kinesisShards.get(shardId); + if(shard == null) { + return lease; + } + lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); + try { + leaseRefresher.updateLease(lease, UpdateField.HASH_KEY_RANGE); + } catch (Exception e) { + log.warn( + "Unable to update hash range key information for lease {} of stream {}. This may result in explicit lease sync.", + lease.leaseKey(), streamIdentifier); + } + } + return lease; + }).filter(lease -> lease.hashKeyRangeForLease() != null).collect(Collectors.toList()); + } else { + return activeLeases; + } + } + @VisibleForTesting static Optional checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier, List hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) { + // Sort and merge the overlapping hash ranges. List mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); + if(mergedHashKeyRanges.isEmpty()) { + log.error("No valid hashranges found for stream {} between {} and {}.", streamIdentifier, + MIN_HASH_KEY, MAX_HASH_KEY); + return Optional.of(new HashRangeHole(new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY), + new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY))); + } + + // Validate for hashranges bounds. if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges .get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) { - log.error("Incomplete hash range found between {} and {}.", mergedHashKeyRanges.get(0), + log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier, + mergedHashKeyRanges.get(0), mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)); return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0), mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1))); } + // Check for any holes in the sorted hashrange intervals. if (mergedHashKeyRanges.size() > 1) { for (int i = 1; i < mergedHashKeyRanges.size(); i++) { final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1); @@ -232,8 +304,8 @@ class PeriodicShardSyncManager { final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey(); if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { - log.error("Incomplete hash range found between {} and {}.", hashRangeAtStartOfPossibleHole, - hashRangeAtEndOfPossibleHole); + log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, + hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole); return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole)); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index a2f9a45e..3b1003ee 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -351,7 +351,7 @@ public class Scheduler implements Runnable { log.info("LeaseCoordinator is already running. No need to start it."); } log.info("Scheduling periodicShardSync"); - // leaderElectedPeriodicShardSyncManager.start(shardSyncTasks); + leaderElectedPeriodicShardSyncManager.start(); // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged // TODO: Determine if waitUntilHashRangeCovered() is needed. streamSyncWatch.start(); @@ -417,7 +417,7 @@ public class Scheduler implements Runnable { final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); final StreamConfig streamConfig = currentStreamConfigMap .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); - if (createOrGetShardSyncTaskManager(streamConfig).syncShardAndLeaseInfo()) { + if (createOrGetShardSyncTaskManager(streamConfig).castShardSyncTask()) { log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", streamIdentifier.serialize(), completedShard.toString()); } @@ -480,7 +480,7 @@ public class Scheduler implements Runnable { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.syncShardAndLeaseInfo(); + shardSyncTaskManager.castShardSyncTask(); currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); streamsSynced.add(streamIdentifier); } else { @@ -508,7 +508,7 @@ public class Scheduler implements Runnable { + ". Syncing shards of that stream."); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( currentStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.syncShardAndLeaseInfo(); + shardSyncTaskManager.castShardSyncTask(); currentSetOfStreamsIter.remove(); streamsSynced.add(streamIdentifier); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 0d563a63..4ba0cf86 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -191,6 +191,21 @@ public interface LeaseRefresher { boolean updateLease(Lease lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing + * library such as leaseCounter, leaseOwner, or leaseKey. + * + * @return true if update succeeded, false otherwise + * + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity + * @throws DependencyException if DynamoDB update fails in an unexpected way + */ + default void updateLease(Lease lease, UpdateField updateField) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + throw new UnsupportedOperationException("updateLeaseWithNoExpectation is not implemented"); + } + /** * Check (synchronously) if there are any leases in the lease table. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java index 95b98399..5dbf6366 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java @@ -107,6 +107,15 @@ public interface LeaseSerializer { */ Map getDynamoUpdateLeaseUpdate(Lease lease); + /** + * @param lease + * @param updateField + * @return the attribute value map that updates application-specific data for a lease + */ + default Map getDynamoUpdateLeaseUpdate(Lease lease, UpdateField updateField) { + throw new UnsupportedOperationException(); + } + /** * @return the key schema for creating a DynamoDB table to store leases */ @@ -116,4 +125,5 @@ public interface LeaseSerializer { * @return attribute definitions for creating a DynamoDB table to store leases */ Collection getAttributeDefinitions(); + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index a52ac650..de3d4c6c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -126,7 +126,11 @@ public class ShardSyncTaskManager { this.lock = new ReentrantLock(); } - public TaskResult executeShardSyncTask() { + /** + * Call a ShardSyncTask and return the Task Result. + * @return the Task Result. + */ + public TaskResult callShardSyncTask() { final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPositionInStream, @@ -140,7 +144,11 @@ public class ShardSyncTaskManager { return metricCollectingTask.call(); } - public boolean syncShardAndLeaseInfo() { + /** + * Cast a ShardSyncTask and return if the casting is successful. + * @return if the casting is successful. + */ + public boolean castShardSyncTask() { try { lock.lock(); return checkAndSubmitNextTask(); @@ -197,7 +205,7 @@ public class ShardSyncTaskManager { log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); } // Acquire lock here. If shardSyncRequestPending is false in this completionStage and - // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) + // castShardSyncTask is invoked, before completion stage exits (future completes) // but right after the value of shardSyncRequestPending is checked, it will result in // shardSyncRequestPending being set to true, but no pending futures to trigger the next // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java new file mode 100644 index 00000000..c15449ca --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java @@ -0,0 +1,19 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.leases; + +public enum UpdateField { + CHILD_SHARDS, HASH_KEY_RANGE +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 67e5abbe..867cc507 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -35,6 +35,7 @@ import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseSerializer; +import software.amazon.kinesis.leases.UpdateField; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -659,6 +660,27 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return true; } + @Override + public void updateLease(Lease lease, UpdateField updateField) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + log.debug("Updating lease without expectation {}", lease); + final AWSExceptionManager exceptionManager = createExceptionManager(); + Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); + UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease)) + .attributeUpdates(updates).build(); + try { + try { + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); + } catch (ExecutionException e) { + throw exceptionManager.apply(e.getCause()); + } catch (InterruptedException e) { + throw new DependencyException(e); + } + } catch (DynamoDbException | TimeoutException e) { + throw convertAndRethrowExceptions("update", lease.leaseKey(), e); + } + } + /** * {@inheritDoc} */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 8f293881..4523bada 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -36,6 +36,7 @@ import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseSerializer; +import software.amazon.kinesis.leases.UpdateField; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -268,6 +269,28 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { return result; } + @Override + public Map getDynamoUpdateLeaseUpdate(Lease lease, + UpdateField updateField) { + Map result = new HashMap<>(); + switch (updateField) { + case CHILD_SHARDS: + if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) { + result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); + } + break; + case HASH_KEY_RANGE: + if (lease.hashKeyRangeForLease() != null) { + result.put(STARTING_HASH_KEY, putUpdate( + DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()))); + result.put(ENDING_HASH_KEY, putUpdate( + DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()))); + } + break; + } + return result; + } + @Override public Collection getKeySchema() { List keySchema = new ArrayList<>(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 3449b723..2544eac6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -115,7 +115,7 @@ public class ShutdownTask implements ConsumerTask { // This scenario could happen when customer deletes the stream while leaving the KCL application running. if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); - updateLeasesForChildShards(); + updateLeasesWithChildShards(); } else { log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo)); } @@ -189,7 +189,7 @@ public class ShutdownTask implements ConsumerTask { } } - private void updateLeasesForChildShards() + private void updateLeasesWithChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException { final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); Set childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index 2567a00a..eb88c0dc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -19,22 +19,56 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.kinesis.model.HashKeyRange; +import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.common.HashKeyRangeForLease; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.MultiStreamLease; +import software.amazon.kinesis.leases.ShardDetector; +import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize; +import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; +import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MAX_HASH_KEY; +import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MIN_HASH_KEY; @RunWith(MockitoJUnitRunner.class) public class PeriodicShardSyncManagerTest { + private StreamIdentifier streamIdentifier; + private PeriodicShardSyncManager periodicShardSyncManager; + @Mock + private LeaderDecider leaderDecider; + @Mock + private LeaseRefresher leaseRefresher; + @Mock + Map currentStreamConfigMap; + @Mock + Function shardSyncTaskManagerProvider; + @Before public void setup() { - + streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456"); + periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap, + shardSyncTaskManagerProvider, true); } @Test @@ -136,7 +170,7 @@ public class PeriodicShardSyncManagerTest { add(deserialize("25", "30")); // Missing interval here }}; Assert.assertTrue(PeriodicShardSyncManager - .checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); } @Test @@ -149,7 +183,263 @@ public class PeriodicShardSyncManagerTest { add(deserialize("24", "30")); }}; Assert.assertFalse(PeriodicShardSyncManager - .checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + } + + @Test + public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() { + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, null)); + } + + @Test + public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() { + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, new ArrayList<>())); + } + + @Test + public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsNotReached() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + } + + @Test + public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsReached() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + } + + @Test + public void testIfShardSyncIsInitiatedWhenHoleIsDueToShardEnd() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); // introducing hole here through SHARD_END checkpoint + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + if(lease.hashKeyRangeForLease().startingHashKey().toString().equals("4")) { + lease.checkpoint(ExtendedSequenceNumber.SHARD_END); + } else { + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + } + + @Test + public void testIfShardSyncIsInitiatedWhenNoLeasesAreUsedDueToShardEnd() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.SHARD_END); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + } + + @Test + public void testIfShardSyncIsNotInitiatedWhenHoleShifts() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + List multiStreamLeases2 = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); // Hole between 3 and 5 + add(deserialize("5", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + // Resetting the holes + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2)); + } + + @Test + public void testIfShardSyncIsNotInitiatedWhenHoleShiftsMoreThanOnce() { + List multiStreamLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + List multiStreamLeases2 = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); // Hole between 3 and 5 + add(deserialize("5", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + // Resetting the holes + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); + // Resetting the holes + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + } + + @Test + public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSync() { + ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class); + ShardDetector shardDetector = mock(ShardDetector.class); + when(shardSyncTaskManagerProvider.apply(any())).thenReturn(shardSyncTaskManager); + when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + + final int[] shardCounter = { 0 }; + List hashKeyRangeForLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "20")); + add(deserialize("21", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}; + + List kinesisShards = hashKeyRangeForLeases.stream() + .map(hashKeyRangeForLease -> Shard.builder().shardId("shard-" + (++shardCounter[0])).hashKeyRange( + HashKeyRange.builder().startingHashKey(hashKeyRangeForLease.serializedStartingHashKey()) + .endingHashKey(hashKeyRangeForLease.serializedEndingHashKey()).build()).build()) + .collect(Collectors.toList()); + + when(shardDetector.listShards()).thenReturn(kinesisShards); + + final int[] leaseCounter = { 0 }; + List multiStreamLeases = hashKeyRangeForLeases.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0]))); + lease.shardId("shard-"+(leaseCounter[0])); + // Setting the hashrange only for last two leases + if(leaseCounter[0] >= 3) { + lease.hashKeyRange(hashKeyRangeForLease); + } + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + // Assert that shard sync should never trigger + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + + // Assert that all the leases now has hashRanges set. + for(Lease lease : multiStreamLeases) { + Assert.assertNotNull(lease.hashKeyRangeForLease()); + } + } + + @Test + public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSyncInHoleScenario() { + ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class); + ShardDetector shardDetector = mock(ShardDetector.class); + when(shardSyncTaskManagerProvider.apply(any())).thenReturn(shardSyncTaskManager); + when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + + final int[] shardCounter = { 0 }; + List hashKeyRangeForLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("5", "20")); // Hole between 3 and 5 + add(deserialize("21", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}; + + List kinesisShards = hashKeyRangeForLeases.stream() + .map(hashKeyRangeForLease -> Shard.builder().shardId("shard-" + (++shardCounter[0])).hashKeyRange( + HashKeyRange.builder().startingHashKey(hashKeyRangeForLease.serializedStartingHashKey()) + .endingHashKey(hashKeyRangeForLease.serializedEndingHashKey()).build()).build()) + .collect(Collectors.toList()); + + when(shardDetector.listShards()).thenReturn(kinesisShards); + + final int[] leaseCounter = { 0 }; + List multiStreamLeases = hashKeyRangeForLeases.stream().map(hashKeyRangeForLease -> { + MultiStreamLease lease = new MultiStreamLease(); + lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0]))); + lease.shardId("shard-"+(leaseCounter[0])); + // Setting the hashrange only for last two leases + if(leaseCounter[0] >= 3) { + lease.hashKeyRange(hashKeyRangeForLease); + } + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + // Assert that shard sync should never trigger + IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + + // Assert that all the leases now has hashRanges set. + for(Lease lease : multiStreamLeases) { + Assert.assertNotNull(lease.hashKeyRangeForLease()); + } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index a5fd2add..e5a76ce3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -190,7 +190,7 @@ public class SchedulerTest { }); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); - when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); + when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); @@ -1036,7 +1036,7 @@ public class SchedulerTest { shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); - when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); + when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); if(shardSyncFirstAttemptFailure) { when(shardDetector.listShards()) .thenThrow(new RuntimeException("Service Exception"))