From 62e13ff3a18e6407a59e7000733a544dfb9fe022 Mon Sep 17 00:00:00 2001 From: Jegosh John Date: Tue, 25 Feb 2020 10:41:34 -0800 Subject: [PATCH 01/10] Introduce PeriodicShardSync with leader election --- ...ministicShuffleShardSyncLeaderDecider.java | 152 ++++++++++++++++++ .../kinesis/coordinator/LeaderDecider.java | 39 +++++ .../coordinator/PeriodicShardSyncManager.java | 94 +++++++++++ .../amazon/kinesis/coordinator/Scheduler.java | 49 ++++-- .../software/amazon/kinesis/leases/Lease.java | 14 +- 5 files changed, 324 insertions(+), 24 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/LeaderDecider.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java new file mode 100644 index 00000000..720103c6 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java @@ -0,0 +1,152 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; + +import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; + +/** + * An implementation of the {@code LeaderDecider} to elect leader(s) based on workerId. + * Leases are shuffled using a predetermined constant seed so that lease ordering is + * preserved across workers. + * This reduces the probability of choosing the leader workers co-located on the same + * host in case workerId starts with a common string (e.g. IP Address). + * Hence if a host has 3 workers, IPADDRESS_Worker1, IPADDRESS_Worker2, and IPADDRESS_Worker3, + * we don't end up choosing all 3 for shard sync as a result of natural ordering of Strings. + * This ensures redundancy for shard-sync during host failures. + */ +@Slf4j +class DeterministicShuffleShardSyncLeaderDecider + implements LeaderDecider { + // Fixed seed so that the shuffle order is preserved across workers + static final int DETERMINISTIC_SHUFFLE_SEED = 1947; + + private static final long ELECTION_INITIAL_DELAY_MILLIS = 60 * 1000; + private static final long ELECTION_SCHEDULING_INTERVAL_MILLIS = 5 * 60 * 1000; + private static final int AWAIT_TERMINATION_MILLIS = 5000; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + private final LeaseRefresher leaseRefresher; + private final int numPeriodicShardSyncWorkers; + private final ScheduledExecutorService leaderElectionThreadPool; + + private volatile Set leaders; + + /** + * @param leaseRefresher LeaseManager instance used to fetch leases. + * @param leaderElectionThreadPool Thread-pool to be used for leaderElection. + * @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs. + */ + DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher, ScheduledExecutorService leaderElectionThreadPool, + int numPeriodicShardSyncWorkers) { + this.leaseRefresher = leaseRefresher; + this.leaderElectionThreadPool = leaderElectionThreadPool; + this.numPeriodicShardSyncWorkers = numPeriodicShardSyncWorkers; + } + + /* + * Shuffles the leases deterministically and elects numPeriodicShardSyncWorkers number of workers + * as leaders (workers that will perform shard sync). + */ + private void electLeaders() { + try { + log.debug("Started leader election at: " + Instant.now()); + List leases = leaseRefresher.listLeases(); + List uniqueHosts = leases.stream().map(Lease::leaseOwner) + .filter(owner -> owner != null).distinct().sorted().collect(Collectors.toList()); + + Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED)); + int numShardSyncWorkers = Math.min(uniqueHosts.size(), numPeriodicShardSyncWorkers); + // In case value is currently being read, we wait for reading to complete before updating the variable. + // This is to prevent any ConcurrentModificationException exceptions. + readWriteLock.writeLock().lock(); + leaders = new HashSet<>(uniqueHosts.subList(0, numShardSyncWorkers)); + log.info("Elected leaders: " + String.join(", ", leaders)); + log.debug("Completed leader election at: " + Instant.now()); + } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { + log.error("Exception occurred while trying to fetch all leases for leader election", e); + } catch (Throwable t) { + log.error("Unknown exception during leader election.", t); + } finally { + readWriteLock.writeLock().unlock(); + } + } + + private boolean isWorkerLeaderForShardSync(String workerId) { + return CollectionUtils.isNullOrEmpty(leaders) || leaders.contains(workerId); + } + + @Override + public synchronized Boolean isLeader(String workerId) { + // if no leaders yet, synchronously get leaders. This will happen at first Shard Sync. + if (executeConditionCheckWithReadLock(() -> CollectionUtils.isNullOrEmpty(leaders))) { + electLeaders(); + // start a scheduled executor that will periodically update leaders. + // The first run will be after a minute. + // We don't need jitter since it is scheduled with a fixed delay and time taken to scan leases + // will be different at different times and on different hosts/workers. + leaderElectionThreadPool.scheduleWithFixedDelay(this::electLeaders, ELECTION_INITIAL_DELAY_MILLIS, + ELECTION_SCHEDULING_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + } + + return executeConditionCheckWithReadLock(() -> isWorkerLeaderForShardSync(workerId)); + } + + @Override + public synchronized void shutdown() { + try { + leaderElectionThreadPool.shutdown(); + if (leaderElectionThreadPool.awaitTermination(AWAIT_TERMINATION_MILLIS, TimeUnit.MILLISECONDS)) { + log.info("Successfully stopped leader election on the worker"); + } else { + leaderElectionThreadPool.shutdownNow(); + log.info(String.format("Stopped leader election thread after awaiting termination for %d milliseconds", + AWAIT_TERMINATION_MILLIS)); + } + + } catch (InterruptedException e) { + log.debug("Encountered InterruptedException while awaiting leader election threadPool termination"); + } + } + + // Execute condition checks using shared variables under a read-write lock. + private boolean executeConditionCheckWithReadLock(BooleanSupplier action) { + try { + readWriteLock.readLock().lock(); + return action.getAsBoolean(); + } finally { + readWriteLock.readLock().unlock(); + } + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/LeaderDecider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/LeaderDecider.java new file mode 100644 index 00000000..140791af --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/LeaderDecider.java @@ -0,0 +1,39 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +/** + * Used in conjunction with periodic shard sync. + * Implement this interface to allow KCL to decide if the current worker should execute shard sync. + * When periodic shard sync is enabled, PeriodicShardSyncManager periodically checks if the current + * worker is one of the leaders designated to execute shard-sync and then acts accordingly. + */ +public interface LeaderDecider { + + /** + * Method invoked to check the given workerId corresponds to one of the workers + * designated to execute shard-syncs periodically. + * + * @param workerId ID of the worker + * @return True if the worker with ID workerId can execute shard-sync. False otherwise. + */ + Boolean isLeader(String workerId); + + /** + * Can be invoked, if needed, to shutdown any clients/thread-pools + * being used in the LeaderDecider implementation. + */ + void shutdown(); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java new file mode 100644 index 00000000..3134fef7 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -0,0 +1,94 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.Validate; +import software.amazon.kinesis.leases.ShardSyncTask; +import software.amazon.kinesis.lifecycle.ConsumerTask; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; +import software.amazon.kinesis.metrics.MetricsFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * The top level orchestrator for coordinating the periodic shard sync related + * activities. + */ +@Getter +@EqualsAndHashCode +@Slf4j +class PeriodicShardSyncManager { + private static final long INITIAL_DELAY = 0; + private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000; + + private final String workerId; + private final LeaderDecider leaderDecider; + private final ConsumerTask metricsEmittingShardSyncTask; + private final ScheduledExecutorService shardSyncThreadPool; + private boolean isRunning; + + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, MetricsFactory metricsFactory) { + this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory); + } + + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, MetricsFactory metricsFactory) { + Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); + Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); + Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); + this.workerId = workerId; + this.leaderDecider = leaderDecider; + this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); + this.shardSyncThreadPool = shardSyncThreadPool; + } + + public synchronized TaskResult start() { + if (!isRunning) { + shardSyncThreadPool + .scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); + isRunning = true; + } + return new TaskResult(null); + } + + public void stop() { + if (isRunning) { + log.info(String.format("Shutting down leader decider on worker %s", workerId)); + leaderDecider.shutdown(); + log.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId)); + shardSyncThreadPool.shutdown(); + isRunning = false; + } + } + + private void runShardSync() { + try { + if (leaderDecider.isLeader(workerId)) { + log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); + metricsEmittingShardSyncTask.call(); + } else { + log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); + } + } catch (Throwable t) { + log.error("Error during runShardSync.", t); + } + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index eaeb5a1c..7f92c384 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -15,20 +15,7 @@ package software.amazon.kinesis.coordinator; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - import com.google.common.annotations.VisibleForTesting; - import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; @@ -39,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -48,7 +36,6 @@ import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; -import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.lifecycle.LifecycleConfig; @@ -70,6 +57,19 @@ import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** * */ @@ -78,6 +78,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; @Slf4j public class Scheduler implements Runnable { + private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; private SchedulerLog slog = new SchedulerLog(); private final CheckpointConfig checkpointConfig; @@ -101,6 +102,7 @@ public class Scheduler implements Runnable { // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager shardSyncTaskManager; + private final PeriodicShardSyncManager periodicShardSyncManager; private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -119,6 +121,7 @@ public class Scheduler implements Runnable { private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; private final long schedulerInitializationBackoffTimeMillis; + private LeaderDecider leaderDecider; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -209,6 +212,10 @@ public class Scheduler implements Runnable { this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory() .createWorkerStateChangeListener(); } + if (leaderDecider == null) { + leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, + Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); + } this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); @@ -222,6 +229,11 @@ public class Scheduler implements Runnable { this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); + ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, + cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, + metricsFactory); + this.periodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), + leaderDecider, shardSyncTask, metricsFactory); } /** @@ -235,13 +247,17 @@ public class Scheduler implements Runnable { try { initialize(); - log.info("Initialization complete. Starting worker loop."); + log.info("Initialization complete. Scheduling periodicShardSync.."); + + periodicShardSyncManager.start(); + + log.info("Scheduled periodicShardSync tasks. Starting worker loop."); + } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e); workerStateChangeListener.onAllInitializationAttemptsFailed(e); shutdown(); } - while (!shouldShutdown()) { runProcessLoop(); } @@ -516,6 +532,7 @@ public class Scheduler implements Runnable { // Lost leases will force Worker to begin shutdown process for all shard consumers in // Worker.run(). leaseCoordinator.stop(); + periodicShardSyncManager.stop(); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 802ee29b..682a6f9e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -14,14 +14,6 @@ */ package software.amazon.kinesis.leases; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Collections2; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; @@ -30,6 +22,12 @@ import lombok.ToString; import lombok.experimental.Accessors; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + /** * This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a * fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend From 8511475868ec7bf502afa9396ee2fb4dcb29ce94 Mon Sep 17 00:00:00 2001 From: Jegosh John Date: Tue, 3 Mar 2020 16:02:00 -0800 Subject: [PATCH 02/10] Minor refactor, addressing feedback --- .../coordinator/PeriodicShardSyncManager.java | 55 ++++++-- .../amazon/kinesis/coordinator/Scheduler.java | 42 +++--- ...sticShuffleShardSyncLeaderDeciderTest.java | 125 ++++++++++++++++++ 3 files changed, 188 insertions(+), 34 deletions(-) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 3134fef7..7b10694f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -18,6 +18,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; @@ -36,8 +37,8 @@ import java.util.concurrent.TimeUnit; @EqualsAndHashCode @Slf4j class PeriodicShardSyncManager { - private static final long INITIAL_DELAY = 0; - private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000; + private static final long INITIAL_DELAY = 60 * 1000L; + private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; private final String workerId; private final LeaderDecider leaderDecider; @@ -61,14 +62,34 @@ class PeriodicShardSyncManager { public synchronized TaskResult start() { if (!isRunning) { - shardSyncThreadPool - .scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, - TimeUnit.MILLISECONDS); + final Runnable periodicShardSyncer = () -> { + try { + runShardSync(); + } catch (Throwable t) { + log.error("Error during runShardSync.", t); + } + }; + shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); isRunning = true; + } return new TaskResult(null); } + public synchronized TaskResult syncShardsOnce() { + + Exception lastException = null; + try { + if (!isRunning) { + runShardSync(); + } + } catch (Exception e) { + lastException = e; + } + return new TaskResult(lastException); + } + public void stop() { if (isRunning) { log.info(String.format("Shutting down leader decider on worker %s", workerId)); @@ -80,15 +101,23 @@ class PeriodicShardSyncManager { } private void runShardSync() { - try { - if (leaderDecider.isLeader(workerId)) { - log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); - metricsEmittingShardSyncTask.call(); - } else { - log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); + if (leaderDecider.isLeader(workerId)) { + log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); + final TaskResult taskResult = metricsEmittingShardSyncTask.call(); + if (taskResult != null && taskResult.getException() != null) { + throw new KinesisClientLibIOException("Failed to sync shards", taskResult.getException()); } - } catch (Throwable t) { - log.error("Error during runShardSync.", t); + } else { + log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); } } + + /** + * Checks if the entire hash range is covered + * @return true if covered, false otherwise + */ + public boolean hashRangeCovered() { + // TODO: Implement method + return true; + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 7f92c384..87b92162 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -46,7 +46,6 @@ import software.amazon.kinesis.lifecycle.ShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; @@ -68,6 +67,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** @@ -102,7 +102,7 @@ public class Scheduler implements Runnable { // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager shardSyncTaskManager; - private final PeriodicShardSyncManager periodicShardSyncManager; + private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -121,7 +121,7 @@ public class Scheduler implements Runnable { private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; private final long schedulerInitializationBackoffTimeMillis; - private LeaderDecider leaderDecider; + private final LeaderDecider leaderDecider; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -212,10 +212,8 @@ public class Scheduler implements Runnable { this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory() .createWorkerStateChangeListener(); } - if (leaderDecider == null) { - leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, + this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); - } this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); @@ -232,7 +230,7 @@ public class Scheduler implements Runnable { ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, metricsFactory); - this.periodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), + this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), leaderDecider, shardSyncTask, metricsFactory); } @@ -247,12 +245,7 @@ public class Scheduler implements Runnable { try { initialize(); - log.info("Initialization complete. Scheduling periodicShardSync.."); - - periodicShardSyncManager.start(); - - log.info("Scheduled periodicShardSync tasks. Starting worker loop."); - + log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e); workerStateChangeListener.onAllInitializationAttemptsFailed(e); @@ -282,11 +275,14 @@ public class Scheduler implements Runnable { TaskResult result = null; if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { + for (int j = 0; j < 10 && leaseRefresher.isLeaseTableEmpty(); j++) { + // check every 1-5 seconds if lease table is still empty, + // to minimize contention between all workers bootstrapping at the same time + long waitTime = ThreadLocalRandom.current().nextLong(1000L, 5000L); + Thread.sleep(waitTime); + } log.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, - metricsFactory); - result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + result = leaderElectedPeriodicShardSyncManager.syncShardsOnce(); } else { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); } @@ -298,6 +294,9 @@ public class Scheduler implements Runnable { } else { log.info("LeaseCoordinator is already running. No need to start it."); } + log.info("Scheduling periodicShardSync)"); + // leaderElectedPeriodicShardSyncManager.start(); + // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged isDone = true; } else { lastException = result.getException(); @@ -309,9 +308,10 @@ public class Scheduler implements Runnable { lastException = e; } - if (!isDone) { + if (!isDone || !leaderElectedPeriodicShardSyncManager.hashRangeCovered()) { try { Thread.sleep(schedulerInitializationBackoffTimeMillis); + leaderElectedPeriodicShardSyncManager.stop(); } catch (InterruptedException e) { log.debug("Sleep interrupted while initializing worker."); } @@ -532,7 +532,7 @@ public class Scheduler implements Runnable { // Lost leases will force Worker to begin shutdown process for all shard consumers in // Worker.run(). leaseCoordinator.stop(); - periodicShardSyncManager.stop(); + leaderElectedPeriodicShardSyncManager.stop(); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } } @@ -630,12 +630,12 @@ public class Scheduler implements Runnable { hierarchicalShardSyncer, metricsFactory); return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), - argument, lifecycleConfig.taskExecutionListener(),lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); + argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); } /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. - * + *

* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java new file mode 100644 index 00000000..b6ff3a0d --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; +import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED; + +@RunWith(MockitoJUnitRunner.class) +public class DeterministicShuffleShardSyncLeaderDeciderTest { + private static final String LEASE_KEY = "lease_key"; + private static final String LEASE_OWNER = "lease_owner"; + private static final String WORKER_ID = "worker-id"; + + private DeterministicShuffleShardSyncLeaderDecider leaderDecider; + + @Mock + private LeaseRefresher leaseRefresher; + + @Mock + private ScheduledExecutorService scheduledExecutorService; + + private int numShardSyncWorkers; + + @Before + public void setup() { + numShardSyncWorkers = 1; + leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers); + } + + @Test + public void testLeaderElectionWithNullLeases() { + boolean isLeader = leaderDecider.isLeader(WORKER_ID); + assertTrue("IsLeader should return true if leaders is null", isLeader); + } + + @Test + public void testLeaderElectionWithEmptyLeases() throws Exception { + when(leaseRefresher.listLeases()).thenReturn(new ArrayList<>()); + boolean isLeader = leaderDecider.isLeader(WORKER_ID); + assertTrue("IsLeader should return true if no leases are returned", isLeader); + } + + @Test + public void testElectedLeadersAsPerExpectedShufflingOrder() + throws Exception { + List leases = getLeases(5, false /* duplicateLeaseOwner */, true /* activeLeases */); + when(leaseRefresher.listLeases()).thenReturn(leases); + Set expectedLeaders = getExpectedLeaders(leases); + for (String leader : expectedLeaders) { + assertTrue(leaderDecider.isLeader(leader)); + } + for (Lease lease : leases) { + if (!expectedLeaders.contains(lease.leaseOwner())) { + assertFalse(leaderDecider.isLeader(lease.leaseOwner())); + } + } + } + + @Test + public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() { + this.numShardSyncWorkers = 5; // More than number of unique lease owners + leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers); + List leases = getLeases(3, false /* duplicateLeaseOwner */, true /* activeLeases */); + Set expectedLeaders = getExpectedLeaders(leases); + // All lease owners should be present in expected leaders set, and they should all be leaders. + for (Lease lease : leases) { + assertTrue(leaderDecider.isLeader(lease.leaseOwner())); + assertTrue(expectedLeaders.contains(lease.leaseOwner())); + } + } + + private List getLeases(int count, boolean duplicateLeaseOwner, boolean activeLeases) { + List leases = new ArrayList<>(); + for (int i = 0; i < count; i++) { + Lease lease = new Lease(); + lease.leaseKey(LEASE_KEY + i); + lease.checkpoint(activeLeases ? ExtendedSequenceNumber.LATEST : ExtendedSequenceNumber.SHARD_END); + lease.leaseCounter(new Random().nextLong()); + lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i)); + leases.add(lease); + } + return leases; + } + + private Set getExpectedLeaders(List leases) { + List uniqueHosts = leases.stream().filter(lease -> lease.leaseOwner() != null) + .map(Lease::leaseOwner).distinct().sorted().collect(Collectors.toList()); + + Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED)); + int numWorkers = Math.min(uniqueHosts.size(), this.numShardSyncWorkers); + return new HashSet<>(uniqueHosts.subList(0, numWorkers)); + } +} From 3bd9b29a1312426ec0cce17643579c62192afb9b Mon Sep 17 00:00:00 2001 From: Jegosh John Date: Fri, 6 Mar 2020 10:53:35 -0800 Subject: [PATCH 03/10] Add wait to initialization until hashRange covered, modify wait logic for empty leaseTable check --- .../coordinator/PeriodicShardSyncManager.java | 5 ++ .../amazon/kinesis/coordinator/Scheduler.java | 54 ++++++++++++++----- .../kinesis/coordinator/SchedulerTest.java | 39 ++++++++++++++ 3 files changed, 86 insertions(+), 12 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 7b10694f..7d166b98 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -77,6 +77,11 @@ class PeriodicShardSyncManager { return new TaskResult(null); } + /** + * Runs shardSync once + * Does not schedule periodic shardSync + * @return the result of the task + */ public synchronized TaskResult syncShardsOnce() { Exception lastException = null; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 87b92162..6f4d4ed1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -37,7 +37,10 @@ import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.ShardConsumerArgument; @@ -79,6 +82,10 @@ import java.util.concurrent.TimeUnit; public class Scheduler implements Runnable { private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; + private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; + private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L; + private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; + private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private SchedulerLog slog = new SchedulerLog(); private final CheckpointConfig checkpointConfig; @@ -227,11 +234,7 @@ public class Scheduler implements Runnable { this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, - metricsFactory); - this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), - leaderDecider, shardSyncTask, metricsFactory); + this.leaderElectedPeriodicShardSyncManager = buildPeriodicShardSyncManager(); } /** @@ -275,12 +278,7 @@ public class Scheduler implements Runnable { TaskResult result = null; if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { - for (int j = 0; j < 10 && leaseRefresher.isLeaseTableEmpty(); j++) { - // check every 1-5 seconds if lease table is still empty, - // to minimize contention between all workers bootstrapping at the same time - long waitTime = ThreadLocalRandom.current().nextLong(1000L, 5000L); - Thread.sleep(waitTime); - } + waitUntilLeaseTableIsReady(); log.info("Syncing Kinesis shard info"); result = leaderElectedPeriodicShardSyncManager.syncShardsOnce(); } else { @@ -297,6 +295,7 @@ public class Scheduler implements Runnable { log.info("Scheduling periodicShardSync)"); // leaderElectedPeriodicShardSyncManager.start(); // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged + waitUntilHashRangeCovered(); isDone = true; } else { lastException = result.getException(); @@ -308,7 +307,7 @@ public class Scheduler implements Runnable { lastException = e; } - if (!isDone || !leaderElectedPeriodicShardSyncManager.hashRangeCovered()) { + if (!isDone) { try { Thread.sleep(schedulerInitializationBackoffTimeMillis); leaderElectedPeriodicShardSyncManager.stop(); @@ -325,6 +324,29 @@ public class Scheduler implements Runnable { } } + @VisibleForTesting + void waitUntilLeaseTableIsReady() throws InterruptedException, + DependencyException, ProvisionedThroughputException, InvalidStateException { + long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + long waitUntil = System.currentTimeMillis() + waitTime; + + while (System.currentTimeMillis() < waitUntil && leaseRefresher.isLeaseTableEmpty()) { + // check every 3 seconds if lease table is still empty, + // to minimize contention between all workers bootstrapping at the same time + log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); + Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); + } + } + + private void waitUntilHashRangeCovered() throws InterruptedException { + + while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) { + // wait until entire hash range is covered + log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS); + Thread.sleep(HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS); + } + } + @VisibleForTesting void runProcessLoop() { try { @@ -633,6 +655,14 @@ public class Scheduler implements Runnable { argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); } + private PeriodicShardSyncManager buildPeriodicShardSyncManager() { + final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, + cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, + metricsFactory); + return new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), + leaderDecider, shardSyncTask, metricsFactory); + } + /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. *

diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 8be1bb8f..3bf68829 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -89,6 +89,8 @@ public class SchedulerTest { private final String applicationName = "applicationName"; private final String streamName = "streamName"; private final String namespace = "testNamespace"; + private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L; + private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private Scheduler scheduler; private ShardRecordProcessorFactory shardRecordProcessorFactory; @@ -265,6 +267,43 @@ public class SchedulerTest { verify(shardDetector, times(maxInitializationAttempts)).listShards(); } + @Test + public final void testInitializationWaitsWhenLeaseTableIsEmpty() throws Exception { + final int maxInitializationAttempts = 1; + coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts); + coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + + doNothing().when(leaseCoordinator).initialize(); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + + long startTime = System.currentTimeMillis(); + scheduler.waitUntilLeaseTableIsReady(); + long endTime = System.currentTimeMillis(); + + assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + assertTrue(endTime - startTime < MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + } + + @Test + public final void testInitializationDoesntWaitWhenLeaseTableIsNotEmpty() throws Exception { + final int maxInitializationAttempts = 1; + coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts); + coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + + doNothing().when(leaseCoordinator).initialize(); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); + + long startTime = System.currentTimeMillis(); + scheduler.waitUntilLeaseTableIsReady(); + long endTime = System.currentTimeMillis(); + + assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + } + @Test public final void testSchedulerShutdown() { scheduler.shutdown(); From f625aea63de4755ccda70f449213a276ec9d57aa Mon Sep 17 00:00:00 2001 From: Jegosh John Date: Fri, 6 Mar 2020 11:37:29 -0800 Subject: [PATCH 04/10] Fixing scheduler initialization waitWhenLeaseTableEmpty unit test --- .../software/amazon/kinesis/coordinator/SchedulerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 3bf68829..3a0ad054 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -91,6 +91,7 @@ public class SchedulerTest { private final String namespace = "testNamespace"; private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; + private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; private Scheduler scheduler; private ShardRecordProcessorFactory shardRecordProcessorFactory; @@ -283,7 +284,7 @@ public class SchedulerTest { long endTime = System.currentTimeMillis(); assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); - assertTrue(endTime - startTime < MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + assertTrue(endTime - startTime < (MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS + LEASE_TABLE_CHECK_FREQUENCY_MILLIS)); } @Test From f57a3326710d6f8fedf1a7c942f6505c5f51ce00 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Wed, 18 Mar 2020 11:56:20 -0700 Subject: [PATCH 05/10] Make periodic shard sync support multi streams --- .../coordinator/PeriodicShardSyncManager.java | 44 ++++++------ .../amazon/kinesis/coordinator/Scheduler.java | 56 ++++----------- .../kinesis/leases/ShardSyncTaskManager.java | 71 ++++++++++++++++--- .../kinesis/coordinator/SchedulerTest.java | 19 +++-- 4 files changed, 109 insertions(+), 81 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 7d166b98..5de09c37 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -18,16 +18,18 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; -import software.amazon.kinesis.leases.ShardSyncTask; +import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; -import software.amazon.kinesis.metrics.MetricsFactory; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * The top level orchestrator for coordinating the periodic shard sync related @@ -42,21 +44,21 @@ class PeriodicShardSyncManager { private final String workerId; private final LeaderDecider leaderDecider; - private final ConsumerTask metricsEmittingShardSyncTask; + private final Map streamToShardSyncTaskManagerMap; private final ScheduledExecutorService shardSyncThreadPool; private boolean isRunning; - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, MetricsFactory metricsFactory) { - this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory); + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map streamToShardSyncTaskManagerMap) { + this(workerId, leaderDecider, streamToShardSyncTaskManagerMap, Executors.newSingleThreadScheduledExecutor()); } - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, MetricsFactory metricsFactory) { + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map streamToShardSyncTaskManagerMap, + ScheduledExecutorService shardSyncThreadPool) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); - Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); this.workerId = workerId; this.leaderDecider = leaderDecider; - this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); + this.streamToShardSyncTaskManagerMap = streamToShardSyncTaskManagerMap; this.shardSyncThreadPool = shardSyncThreadPool; } @@ -82,17 +84,14 @@ class PeriodicShardSyncManager { * Does not schedule periodic shardSync * @return the result of the task */ - public synchronized TaskResult syncShardsOnce() { - - Exception lastException = null; - try { - if (!isRunning) { - runShardSync(); + public synchronized void syncShardsOnce() throws Exception { + for (Map.Entry mapEntry : streamToShardSyncTaskManagerMap.entrySet()) { + final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue(); + final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask(); + if (taskResult.getException() != null) { + throw taskResult.getException(); } - } catch (Exception e) { - lastException = e; } - return new TaskResult(lastException); } public void stop() { @@ -107,10 +106,11 @@ class PeriodicShardSyncManager { private void runShardSync() { if (leaderDecider.isLeader(workerId)) { - log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); - final TaskResult taskResult = metricsEmittingShardSyncTask.call(); - if (taskResult != null && taskResult.getException() != null) { - throw new KinesisClientLibIOException("Failed to sync shards", taskResult.getException()); + for (Map.Entry mapEntry : streamToShardSyncTaskManagerMap.entrySet()) { + final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue(); + if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { + throw new KinesisClientLibIOException("Failed to submit shard sync task for stream " + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + } } } else { log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 5edf9472..dc730441 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -56,9 +56,7 @@ import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; -import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; -import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; @@ -72,9 +70,7 @@ import software.amazon.kinesis.lifecycle.ShardConsumerArgument; import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownReason; -import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; @@ -85,19 +81,8 @@ import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; /** * @@ -136,7 +121,6 @@ public class Scheduler implements Runnable { private final LeaseCoordinator leaseCoordinator; private final Function shardSyncTaskManagerProvider; private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); - private final ShardSyncTaskManager shardSyncTaskManager; private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; @@ -278,7 +262,8 @@ public class Scheduler implements Runnable { // TODO : Halo : Check if this needs to be per stream. this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); - this.leaderElectedPeriodicShardSyncManager = buildPeriodicShardSyncManager(); + this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), + leaderDecider, streamToShardSyncTaskManagerMap); } /** @@ -320,25 +305,16 @@ public class Scheduler implements Runnable { log.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); - TaskResult result; if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: for already synced streams - waitUntilLeaseTableIsReady(); - for(Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { - final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); - log.info("Syncing Kinesis shard info for " + streamIdentifier); - final StreamConfig streamConfig = streamConfigEntry.getValue(); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier), - leaseRefresher, streamConfig.initialPositionInStreamExtended(), - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, - hierarchicalShardSyncer, metricsFactory); - result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); - // Throwing the exception, to prevent further syncs for other stream. - if (result.getException() != null) { - log.error("Caught exception when sync'ing info for " + streamIdentifier, result.getException()); - throw result.getException(); + if (!waitAndCheckIfLeaseTableIsReady()) { + for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); + createOrGetShardSyncTaskManager(streamIdentifier); + log.info("Creating shard sync task for " + streamIdentifier); } + leaderElectedPeriodicShardSyncManager.syncShardsOnce(); } } else { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); @@ -353,9 +329,10 @@ public class Scheduler implements Runnable { log.info("LeaseCoordinator is already running. No need to start it."); } log.info("Scheduling periodicShardSync)"); - // leaderElectedPeriodicShardSyncManager.start(); + // leaderElectedPeriodicShardSyncManager.start(shardSyncTasks); // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged - waitUntilHashRangeCovered(); + // TODO: Determine if waitUntilHashRangeCovered() is needed. + //waitUntilHashRangeCovered(); isDone = true; } catch (LeasingException e) { log.error("Caught exception when initializing LeaseCoordinator", e); @@ -382,7 +359,7 @@ public class Scheduler implements Runnable { } @VisibleForTesting - void waitUntilLeaseTableIsReady() throws InterruptedException, + boolean waitAndCheckIfLeaseTableIsReady() throws InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException { long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); long waitUntil = System.currentTimeMillis() + waitTime; @@ -393,6 +370,7 @@ public class Scheduler implements Runnable { log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); } + return !leaseRefresher.isLeaseTableEmpty(); } private void waitUntilHashRangeCovered() throws InterruptedException { @@ -725,14 +703,6 @@ public class Scheduler implements Runnable { argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); } - private PeriodicShardSyncManager buildPeriodicShardSyncManager() { - final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(getStreamIdentifier), leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, - metricsFactory); - return new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), - leaderDecider, shardSyncTask, metricsFactory); - } - /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. *

diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index f6db72e3..8c8e0464 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -14,9 +14,12 @@ */ package software.amazon.kinesis.leases; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -53,6 +56,10 @@ public class ShardSyncTaskManager { private final HierarchicalShardSyncer hierarchicalShardSyncer; @NonNull private final MetricsFactory metricsFactory; + private ConsumerTask currentTask; + private CompletableFuture future; + private AtomicBoolean shardSyncRequestPending; + private final ReentrantLock lock; /** * Constructor. @@ -82,6 +89,8 @@ public class ShardSyncTaskManager { this.executorService = executorService; this.hierarchicalShardSyncer = new HierarchicalShardSyncer(); this.metricsFactory = metricsFactory; + this.shardSyncRequestPending = new AtomicBoolean(false); + this.lock = new ReentrantLock(); } /** @@ -110,16 +119,33 @@ public class ShardSyncTaskManager { this.executorService = executorService; this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.metricsFactory = metricsFactory; + this.shardSyncRequestPending = new AtomicBoolean(false); + this.lock = new ReentrantLock(); } - private ConsumerTask currentTask; - private Future future; - - public synchronized boolean syncShardAndLeaseInfo() { - return checkAndSubmitNextTask(); + public TaskResult executeShardSyncTask() { + final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, + leaseRefresher, + initialPositionInStream, + cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, + shardSyncIdleTimeMillis, + hierarchicalShardSyncer, + metricsFactory); + final ConsumerTask metricCollectingTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); + return metricCollectingTask.call(); } - private synchronized boolean checkAndSubmitNextTask() { + public boolean syncShardAndLeaseInfo() { + try { + lock.lock(); + return checkAndSubmitNextTask(); + } finally { + lock.unlock(); + } + } + + private boolean checkAndSubmitNextTask() { boolean submittedNewTask = false; if ((future == null) || future.isCancelled() || future.isDone()) { if ((future != null) && future.isDone()) { @@ -145,18 +171,45 @@ public class ShardSyncTaskManager { hierarchicalShardSyncer, metricsFactory), metricsFactory); - future = executorService.submit(currentTask); + future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) + .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult)); submittedNewTask = true; if (log.isDebugEnabled()) { log.debug("Submitted new {} task.", currentTask.taskType()); } } else { if (log.isDebugEnabled()) { - log.debug("Previous {} task still pending. Not submitting new task.", currentTask.taskType()); + log.debug("Previous {} task still pending. Not submitting new task. " + + "Enqueued a request that will be executed when the current request completes.", currentTask.taskType()); } + shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/); } - return submittedNewTask; } + private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) { + if (exception != null || taskResult.getException() != null) { + log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); + } + // Acquire lock here. If shardSyncRequestPending is false in this completionStage and + // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) + // but right after the value of shardSyncRequestPending is checked, it will result in + // shardSyncRequestPending being set to true, but no pending futures to trigger the next + // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the + // previous task is in this completion stage, checkAndSubmitNextTask is not invoked + // until this completionStage exits. + try { + lock.lock(); + if (shardSyncRequestPending.get()) { + shardSyncRequestPending.set(false); + // reset future to null, so next call creates a new one + // without trying to get results from the old future. + future = null; + checkAndSubmitNextTask(); + } + } finally { + lock.unlock(); + } + } + } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index eabe25cd..b14f3303 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -78,6 +78,7 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; +import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; @@ -175,6 +176,7 @@ public class SchedulerTest { when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, @@ -278,14 +280,14 @@ public class SchedulerTest { @Test public final void testInitializationFailureWithRetries() throws Exception { doNothing().when(leaseCoordinator).initialize(); - when(shardDetector.listShards()).thenThrow(new RuntimeException()); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenThrow(new RuntimeException()); leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); scheduler.run(); - verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards(); + verify(dynamoDBLeaseRefresher, times(coordinatorConfig.maxInitializationAttempts())).isLeaseTableEmpty(); } @Test @@ -298,18 +300,20 @@ public class SchedulerTest { metricsConfig, processorConfig, retrievalConfig); doNothing().when(leaseCoordinator).initialize(); - when(shardDetector.listShards()).thenThrow(new RuntimeException()); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenThrow(new RuntimeException()); scheduler.run(); // verify initialization was retried for maxInitializationAttempts times - verify(shardDetector, times(maxInitializationAttempts)).listShards(); + verify(dynamoDBLeaseRefresher, times(maxInitializationAttempts)).isLeaseTableEmpty(); } @Test public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); + leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, + workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, true)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); scheduler.initialize(); @@ -322,7 +326,7 @@ public class SchedulerTest { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, - workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, false)); + workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, true)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); scheduler.initialize(); @@ -388,7 +392,7 @@ public class SchedulerTest { when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); long startTime = System.currentTimeMillis(); - scheduler.waitUntilLeaseTableIsReady(); + scheduler.waitAndCheckIfLeaseTableIsReady(); long endTime = System.currentTimeMillis(); assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); @@ -407,7 +411,7 @@ public class SchedulerTest { when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); long startTime = System.currentTimeMillis(); - scheduler.waitUntilLeaseTableIsReady(); + scheduler.waitAndCheckIfLeaseTableIsReady(); long endTime = System.currentTimeMillis(); assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); @@ -681,6 +685,7 @@ public class SchedulerTest { shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); if(shardSyncFirstAttemptFailure) { when(shardDetector.listShards()) .thenThrow(new RuntimeException("Service Exception")) From 384fe5266c137144d4790e8507e6ba9002d74037 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 19 Mar 2020 15:47:27 -0700 Subject: [PATCH 06/10] Address comments --- .../amazon/kinesis/coordinator/Scheduler.java | 14 +++++++------- .../amazon/kinesis/coordinator/SchedulerTest.java | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index dc730441..85234f7f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -94,7 +94,7 @@ public class Scheduler implements Runnable { private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; - private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L; + private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private SchedulerLog slog = new SchedulerLog(); @@ -127,7 +127,6 @@ public class Scheduler implements Runnable { private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final GracefulShutdownCoordinator gracefulShutdownCoordinator; private final WorkerStateChangeListener workerStateChangeListener; - private final InitialPositionInStreamExtended initialPosition; private final MetricsFactory metricsFactory; private final long failoverTimeMillis; private final long taskBackoffTimeMillis; @@ -249,7 +248,6 @@ public class Scheduler implements Runnable { } this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); - this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); // this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds(); @@ -308,7 +306,8 @@ public class Scheduler implements Runnable { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: for already synced streams - if (!waitAndCheckIfLeaseTableIsReady()) { + if (shouldInitiateLeaseSync()) { + log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier()); for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); createOrGetShardSyncTaskManager(streamIdentifier); @@ -359,18 +358,19 @@ public class Scheduler implements Runnable { } @VisibleForTesting - boolean waitAndCheckIfLeaseTableIsReady() throws InterruptedException, + boolean shouldInitiateLeaseSync() throws InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException { long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); long waitUntil = System.currentTimeMillis() + waitTime; - while (System.currentTimeMillis() < waitUntil && leaseRefresher.isLeaseTableEmpty()) { + boolean isLeaseTableEmpty = true; + while (System.currentTimeMillis() < waitUntil && (isLeaseTableEmpty = leaseRefresher.isLeaseTableEmpty())) { // check every 3 seconds if lease table is still empty, // to minimize contention between all workers bootstrapping at the same time log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); } - return !leaseRefresher.isLeaseTableEmpty(); + return isLeaseTableEmpty; } private void waitUntilHashRangeCovered() throws InterruptedException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index b14f3303..812b05df 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -392,7 +392,7 @@ public class SchedulerTest { when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); long startTime = System.currentTimeMillis(); - scheduler.waitAndCheckIfLeaseTableIsReady(); + scheduler.shouldInitiateLeaseSync(); long endTime = System.currentTimeMillis(); assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); @@ -411,7 +411,7 @@ public class SchedulerTest { when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); long startTime = System.currentTimeMillis(); - scheduler.waitAndCheckIfLeaseTableIsReady(); + scheduler.shouldInitiateLeaseSync(); long endTime = System.currentTimeMillis(); assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); From bb495e4f60f564ffc71c33ab319a52fb582fb4ba Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Fri, 20 Mar 2020 11:54:40 -0700 Subject: [PATCH 07/10] Adding metric for ShardSyncTaskManager --- .../amazon/kinesis/coordinator/PeriodicShardSyncManager.java | 3 ++- .../java/software/amazon/kinesis/leases/ShardSyncTask.java | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 5de09c37..1d02f47c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -109,7 +109,8 @@ class PeriodicShardSyncManager { for (Map.Entry mapEntry : streamToShardSyncTaskManagerMap.entrySet()) { final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue(); if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { - throw new KinesisClientLibIOException("Failed to submit shard sync task for stream " + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); } } } else { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index c59608b2..b8f581c6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -23,6 +23,7 @@ import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; @@ -62,6 +63,7 @@ public class ShardSyncTask implements ConsumerTask { public TaskResult call() { Exception exception = null; final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION); + boolean shardSyncSuccess = true; try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, @@ -72,7 +74,9 @@ public class ShardSyncTask implements ConsumerTask { } catch (Exception e) { log.error("Caught exception while sync'ing Kinesis shards and leases", e); exception = e; + shardSyncSuccess = false; } finally { + MetricsUtil.addSuccess(scope, "SyncShards", shardSyncSuccess, MetricsLevel.DETAILED); MetricsUtil.endScope(scope); } From 35be75c347b241ae2f9a1bd5a34c3d476dda0686 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Mon, 23 Mar 2020 18:35:18 -0700 Subject: [PATCH 08/10] Addressing comments about unit tests --- .../amazon/kinesis/coordinator/Scheduler.java | 6 +++--- ...sticShuffleShardSyncLeaderDeciderTest.java | 19 +++++++++++++++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 85234f7f..395882d9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -363,14 +363,14 @@ public class Scheduler implements Runnable { long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); long waitUntil = System.currentTimeMillis() + waitTime; - boolean isLeaseTableEmpty = true; - while (System.currentTimeMillis() < waitUntil && (isLeaseTableEmpty = leaseRefresher.isLeaseTableEmpty())) { + boolean shouldInitiateLeaseSync = true; + while (System.currentTimeMillis() < waitUntil && (shouldInitiateLeaseSync = leaseRefresher.isLeaseTableEmpty())) { // check every 3 seconds if lease table is still empty, // to minimize contention between all workers bootstrapping at the same time log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); } - return isLeaseTableEmpty; + return shouldInitiateLeaseSync; } private void waitUntilHashRangeCovered() throws InterruptedException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java index b6ff3a0d..2e228a8a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.booleanThat; import static org.mockito.Mockito.when; import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED; @@ -72,10 +73,18 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { assertTrue("IsLeader should return true if no leases are returned", isLeader); } + @Test + public void testleaderElectionWithEmptyOwnerLeases() throws Exception { + List leases = getLeases(5, true, true, true); + when(leaseRefresher.listLeases()).thenReturn(leases); + boolean isLeader = leaderDecider.isLeader(WORKER_ID); + assertTrue("IsLeader should return false if leases have no owner", isLeader); + } + @Test public void testElectedLeadersAsPerExpectedShufflingOrder() throws Exception { - List leases = getLeases(5, false /* duplicateLeaseOwner */, true /* activeLeases */); + List leases = getLeases(5, false /*emptyLeaseOwner */,false /* duplicateLeaseOwner */, true /* activeLeases */); when(leaseRefresher.listLeases()).thenReturn(leases); Set expectedLeaders = getExpectedLeaders(leases); for (String leader : expectedLeaders) { @@ -92,7 +101,7 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() { this.numShardSyncWorkers = 5; // More than number of unique lease owners leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers); - List leases = getLeases(3, false /* duplicateLeaseOwner */, true /* activeLeases */); + List leases = getLeases(3, false /*emptyLeaseOwner */, false /* duplicateLeaseOwner */, true /* activeLeases */); Set expectedLeaders = getExpectedLeaders(leases); // All lease owners should be present in expected leaders set, and they should all be leaders. for (Lease lease : leases) { @@ -101,14 +110,16 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { } } - private List getLeases(int count, boolean duplicateLeaseOwner, boolean activeLeases) { + private List getLeases(int count, boolean emptyLeaseOwner, boolean duplicateLeaseOwner, boolean activeLeases) { List leases = new ArrayList<>(); for (int i = 0; i < count; i++) { Lease lease = new Lease(); lease.leaseKey(LEASE_KEY + i); lease.checkpoint(activeLeases ? ExtendedSequenceNumber.LATEST : ExtendedSequenceNumber.SHARD_END); lease.leaseCounter(new Random().nextLong()); - lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i)); + if (!emptyLeaseOwner) { + lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i)); + } leases.add(lease); } return leases; From 98d2f946f4108153a38e2abfbed5cf1ad31ad9f2 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 24 Mar 2020 10:28:05 -0700 Subject: [PATCH 09/10] Fix test comment --- .../DeterministicShuffleShardSyncLeaderDeciderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java index 2e228a8a..b300f355 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java @@ -78,7 +78,7 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { List leases = getLeases(5, true, true, true); when(leaseRefresher.listLeases()).thenReturn(leases); boolean isLeader = leaderDecider.isLeader(WORKER_ID); - assertTrue("IsLeader should return false if leases have no owner", isLeader); + assertTrue("IsLeader should return true if leases have no owner", isLeader); } @Test From 391532da31f2f0c5e3dab6990a4f33f2b327af9b Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Wed, 25 Mar 2020 12:19:47 -0700 Subject: [PATCH 10/10] Adding comments for metric and Todo works --- .../java/software/amazon/kinesis/coordinator/Scheduler.java | 3 +++ .../java/software/amazon/kinesis/leases/ShardSyncTask.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 395882d9..ff9d845e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -375,6 +375,9 @@ public class Scheduler implements Runnable { private void waitUntilHashRangeCovered() throws InterruptedException { + // TODO: Currently this call is not in use. We may need to implement this method later. Created SIM to track the work: https://sim.amazon.com/issues/KinesisLTR-202 + // TODO: For future implementation, streamToShardSyncTaskManagerMap might not contain the most up to date snapshot of active streams. + // Should use currentStreamConfigMap to determine the streams to check. while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) { // wait until entire hash range is covered log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index b8f581c6..d7548a55 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -76,6 +76,9 @@ public class ShardSyncTask implements ConsumerTask { exception = e; shardSyncSuccess = false; } finally { + // NOTE: This metric is reflecting if a shard sync task succeeds. Customer can use this metric to monitor if + // their application encounter any shard sync failures. This metric can help to detect potential shard stuck issues + // that are due to shard sync failures. MetricsUtil.addSuccess(scope, "SyncShards", shardSyncSuccess, MetricsLevel.DETAILED); MetricsUtil.endScope(scope); }