From 5cd40e4718e7f7576e1c0ba64cf03aee37e44936 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 30 Apr 2020 14:03:39 -0700 Subject: [PATCH] Lease Recovery and Blockoing parent bug fix --- .../coordinator/PeriodicShardSyncManager.java | 75 ++++++++++++--- .../amazon/kinesis/coordinator/Scheduler.java | 20 +--- .../leases/HierarchicalShardSyncer.java | 5 +- .../amazon/kinesis/leases/LeaseRefresher.java | 2 +- .../lifecycle/BlockOnParentShardTask.java | 2 - .../retrieval/kpl/ExtendedSequenceNumber.java | 5 + .../lifecycle/BlockOnParentShardTaskTest.java | 92 ++++++++++++++++++- 7 files changed, 164 insertions(+), 37 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 8acccce9..8df18207 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -18,20 +18,32 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.stream.Collectors; /** * The top level orchestrator for coordinating the periodic shard sync related @@ -46,25 +58,33 @@ class PeriodicShardSyncManager { private final String workerId; private final LeaderDecider leaderDecider; + private final LeaseRefresher leaseRefresher; private final Map currentStreamConfigMap; private final Function shardSyncTaskManagerProvider; private final ScheduledExecutorService shardSyncThreadPool; + private final boolean isMultiStreamingMode; private boolean isRunning; - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map currentStreamConfigMap, - Function shardSyncTaskManagerProvider) { - this(workerId, leaderDecider, currentStreamConfigMap, shardSyncTaskManagerProvider, Executors.newSingleThreadScheduledExecutor()); + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, + Map currentStreamConfigMap, + Function shardSyncTaskManagerProvider, boolean isMultiStreamingMode) { + this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, + Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode); } - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map currentStreamConfigMap, - Function shardSyncTaskManagerProvider, ScheduledExecutorService shardSyncThreadPool) { + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, + Map currentStreamConfigMap, + Function shardSyncTaskManagerProvider, + ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); this.workerId = workerId; this.leaderDecider = leaderDecider; + this.leaseRefresher = leaseRefresher; this.currentStreamConfigMap = currentStreamConfigMap; this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider; this.shardSyncThreadPool = shardSyncThreadPool; + this.isMultiStreamingMode = isMultiStreamingMode; } public synchronized TaskResult start() { @@ -116,24 +136,53 @@ class PeriodicShardSyncManager { private void runShardSync() { if (leaderDecider.isLeader(workerId)) { - for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { - final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); - if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { - log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", - shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + try { + final Map> streamToLeasesMap = getStreamToLeasesMap(currentStreamConfigMap.keySet()); + for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); + if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { + log.warn( + "Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + } } + } catch (Exception e) { + // TODO : Log } } else { log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); } } + private Map> getStreamToLeasesMap(final Set streamIdentifiersToFilter) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final List leases = leaseRefresher.listLeases(); + if(!isMultiStreamingMode) { + Validate.isTrue(streamIdentifiersToFilter.size() == 1); + return Collections.singletonMap(streamIdentifiersToFilter.iterator().next(), leases); + } else { + final Map> streamToLeasesMap = new HashMap<>(); + for(Lease lease : leases) { + StreamIdentifier streamIdentifier = StreamIdentifier + .multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()); + if(streamIdentifiersToFilter.contains(streamIdentifier)) { + streamToLeasesMap.computeIfAbsent(streamIdentifier, s -> new ArrayList<>()).add(lease); + } + } + return streamToLeasesMap; + } + } + /** * Checks if the entire hash range is covered * @return true if covered, false otherwise */ - public boolean hashRangeCovered() { - // TODO: Implement method - return true; + public boolean isHashRangeComplete(List leases) { + if(CollectionUtils.isNullOrEmpty(leases)) { + return false; + } else { +// leases.stream().filter(lease -> lease.checkpoint().isShardEnd()) + return false; + } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index a20f86d1..d5eb1be6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -111,7 +111,6 @@ public class Scheduler implements Runnable { private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; - private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker"; private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count"; @@ -289,8 +288,8 @@ public class Scheduler implements Runnable { this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( - leaseManagementConfig.workerIdentifier(), leaderDecider, currentStreamConfigMap, - shardSyncTaskManagerProvider); + leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, + shardSyncTaskManagerProvider, isMultiStreamMode); } /** @@ -351,11 +350,10 @@ public class Scheduler implements Runnable { } else { log.info("LeaseCoordinator is already running. No need to start it."); } - log.info("Scheduling periodicShardSync)"); + log.info("Scheduling periodicShardSync"); // leaderElectedPeriodicShardSyncManager.start(shardSyncTasks); // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged // TODO: Determine if waitUntilHashRangeCovered() is needed. - //waitUntilHashRangeCovered(); streamSyncWatch.start(); isDone = true; } catch (LeasingException e) { @@ -398,18 +396,6 @@ public class Scheduler implements Runnable { return shouldInitiateLeaseSync; } - private void waitUntilHashRangeCovered() throws InterruptedException { - - // TODO: Currently this call is not in use. We may need to implement this method later. Created SIM to track the work: https://sim.amazon.com/issues/KinesisLTR-202 - // TODO: For future implementation, streamToShardSyncTaskManagerMap might not contain the most up to date snapshot of active streams. - // Should use currentStreamConfigMap to determine the streams to check. - while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) { - // wait until entire hash range is covered - log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS); - Thread.sleep(HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS); - } - } - @VisibleForTesting void runProcessLoop() { try { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 714e7f4e..2b2df48c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -45,7 +45,6 @@ import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -341,7 +340,7 @@ public class HierarchicalShardSyncer { "Stream " + streamName + " is not in ACTIVE OR UPDATING state - will retry getting the shard list."); } - if (hashRangeOfShardsIsComplete(shards)) { + if (isHashRangeOfShardsComplete(shards)) { return shards; } @@ -359,7 +358,7 @@ public class HierarchicalShardSyncer { " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); } - private static boolean hashRangeOfShardsIsComplete(@NonNull List shards) { + private static boolean isHashRangeOfShardsComplete(@NonNull List shards) { if (shards.isEmpty()) { throw new IllegalStateException("No shards found when attempting to validate complete hash range."); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index fc3aba8b..0d563a63 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -105,7 +105,7 @@ public interface LeaseRefresher { * @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity * @throws DependencyException if DynamoDB get fails in an unexpected way * - * @return lease for the specified shardId, or null if one doesn't exist + * @return lease for the specified leaseKey, or null if one doesn't exist */ Lease getLease(String leaseKey) throws DependencyException, InvalidStateException, ProvisionedThroughputException; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index 8797085a..fcb3ffde 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -25,8 +25,6 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.util.function.Function; - /** * Task to block until processing of all data records in the parent shard(s) is completed. * We check if we have checkpoint(s) for the parent shard(s). diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java index e2de9786..0c1c4a28 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java @@ -141,6 +141,11 @@ public class ExtendedSequenceNumber implements Comparable emptyParentShardIds = new ArrayList(); private ShardInfo shardInfo; @@ -107,6 +109,50 @@ public class BlockOnParentShardTaskTest { assertNull(result.getException()); } + /** + * Test call() when there are 1-2 parent shards that have been fully processed. + * @throws ProvisionedThroughputException + * @throws InvalidStateException + * @throws DependencyException + */ + @Test + public final void testCallWhenParentsHaveFinishedMultiStream() + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + + ShardInfo shardInfo = null; + BlockOnParentShardTask task = null; + String parent1LeaseKey = streamId + ":" + "shardId-1"; + String parent2LeaseKey = streamId + ":" + "shardId-2"; + String parent1ShardId = "shardId-1"; + String parent2ShardId = "shardId-2"; + List parentShardIds = new ArrayList<>(); + TaskResult result = null; + + Lease parent1Lease = new Lease(); + parent1Lease.checkpoint(ExtendedSequenceNumber.SHARD_END); + Lease parent2Lease = new Lease(); + parent2Lease.checkpoint(ExtendedSequenceNumber.SHARD_END); + + LeaseRefresher leaseRefresher = mock(LeaseRefresher.class); + when(leaseRefresher.getLease(parent1LeaseKey)).thenReturn(parent1Lease); + when(leaseRefresher.getLease(parent2LeaseKey)).thenReturn(parent2Lease); + + // test single parent + parentShardIds.add(parent1ShardId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, + streamId); + task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); + result = task.call(); + assertNull(result.getException()); + + // test two parents + parentShardIds.add(parent2ShardId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId); + task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); + result = task.call(); + assertNull(result.getException()); + } + /** * Test call() when there are 1-2 parent shards that have NOT been fully processed. * @throws ProvisionedThroughputException @@ -115,7 +161,7 @@ public class BlockOnParentShardTaskTest { */ @Test public final void testCallWhenParentsHaveNotFinished() - throws DependencyException, InvalidStateException, ProvisionedThroughputException { + throws DependencyException, InvalidStateException, ProvisionedThroughputException { ShardInfo shardInfo = null; BlockOnParentShardTask task = null; @@ -149,6 +195,50 @@ public class BlockOnParentShardTaskTest { assertNotNull(result.getException()); } + /** + * Test call() when there are 1-2 parent shards that have NOT been fully processed. + * @throws ProvisionedThroughputException + * @throws InvalidStateException + * @throws DependencyException + */ + @Test + public final void testCallWhenParentsHaveNotFinishedMultiStream() + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + + ShardInfo shardInfo = null; + BlockOnParentShardTask task = null; + String parent1LeaseKey = streamId + ":" + "shardId-1"; + String parent2LeaseKey = streamId + ":" + "shardId-2"; + String parent1ShardId = "shardId-1"; + String parent2ShardId = "shardId-2"; + List parentShardIds = new ArrayList<>(); + TaskResult result = null; + + Lease parent1Lease = new Lease(); + parent1Lease.checkpoint(ExtendedSequenceNumber.LATEST); + Lease parent2Lease = new Lease(); + // mock a sequence number checkpoint + parent2Lease.checkpoint(new ExtendedSequenceNumber("98182584034")); + + LeaseRefresher leaseRefresher = mock(LeaseRefresher.class); + when(leaseRefresher.getLease(parent1LeaseKey)).thenReturn(parent1Lease); + when(leaseRefresher.getLease(parent2LeaseKey)).thenReturn(parent2Lease); + + // test single parent + parentShardIds.add(parent1ShardId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId); + task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); + result = task.call(); + assertNotNull(result.getException()); + + // test two parents + parentShardIds.add(parent2ShardId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId); + task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); + result = task.call(); + assertNotNull(result.getException()); + } + /** * Test call() with 1 parent shard before and after it is completely processed. * @throws ProvisionedThroughputException