From 62e13ff3a18e6407a59e7000733a544dfb9fe022 Mon Sep 17 00:00:00 2001 From: Jegosh John Date: Tue, 25 Feb 2020 10:41:34 -0800 Subject: [PATCH] Introduce PeriodicShardSync with leader election --- ...ministicShuffleShardSyncLeaderDecider.java | 152 ++++++++++++++++++ .../kinesis/coordinator/LeaderDecider.java | 39 +++++ .../coordinator/PeriodicShardSyncManager.java | 94 +++++++++++ .../amazon/kinesis/coordinator/Scheduler.java | 49 ++++-- .../software/amazon/kinesis/leases/Lease.java | 14 +- 5 files changed, 324 insertions(+), 24 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/LeaderDecider.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java new file mode 100644 index 00000000..720103c6 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java @@ -0,0 +1,152 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; + +import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; + +/** + * An implementation of the {@code LeaderDecider} to elect leader(s) based on workerId. + * Leases are shuffled using a predetermined constant seed so that lease ordering is + * preserved across workers. + * This reduces the probability of choosing the leader workers co-located on the same + * host in case workerId starts with a common string (e.g. IP Address). + * Hence if a host has 3 workers, IPADDRESS_Worker1, IPADDRESS_Worker2, and IPADDRESS_Worker3, + * we don't end up choosing all 3 for shard sync as a result of natural ordering of Strings. + * This ensures redundancy for shard-sync during host failures. + */ +@Slf4j +class DeterministicShuffleShardSyncLeaderDecider + implements LeaderDecider { + // Fixed seed so that the shuffle order is preserved across workers + static final int DETERMINISTIC_SHUFFLE_SEED = 1947; + + private static final long ELECTION_INITIAL_DELAY_MILLIS = 60 * 1000; + private static final long ELECTION_SCHEDULING_INTERVAL_MILLIS = 5 * 60 * 1000; + private static final int AWAIT_TERMINATION_MILLIS = 5000; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + private final LeaseRefresher leaseRefresher; + private final int numPeriodicShardSyncWorkers; + private final ScheduledExecutorService leaderElectionThreadPool; + + private volatile Set leaders; + + /** + * @param leaseRefresher LeaseManager instance used to fetch leases. + * @param leaderElectionThreadPool Thread-pool to be used for leaderElection. + * @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs. + */ + DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher, ScheduledExecutorService leaderElectionThreadPool, + int numPeriodicShardSyncWorkers) { + this.leaseRefresher = leaseRefresher; + this.leaderElectionThreadPool = leaderElectionThreadPool; + this.numPeriodicShardSyncWorkers = numPeriodicShardSyncWorkers; + } + + /* + * Shuffles the leases deterministically and elects numPeriodicShardSyncWorkers number of workers + * as leaders (workers that will perform shard sync). + */ + private void electLeaders() { + try { + log.debug("Started leader election at: " + Instant.now()); + List leases = leaseRefresher.listLeases(); + List uniqueHosts = leases.stream().map(Lease::leaseOwner) + .filter(owner -> owner != null).distinct().sorted().collect(Collectors.toList()); + + Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED)); + int numShardSyncWorkers = Math.min(uniqueHosts.size(), numPeriodicShardSyncWorkers); + // In case value is currently being read, we wait for reading to complete before updating the variable. + // This is to prevent any ConcurrentModificationException exceptions. + readWriteLock.writeLock().lock(); + leaders = new HashSet<>(uniqueHosts.subList(0, numShardSyncWorkers)); + log.info("Elected leaders: " + String.join(", ", leaders)); + log.debug("Completed leader election at: " + Instant.now()); + } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { + log.error("Exception occurred while trying to fetch all leases for leader election", e); + } catch (Throwable t) { + log.error("Unknown exception during leader election.", t); + } finally { + readWriteLock.writeLock().unlock(); + } + } + + private boolean isWorkerLeaderForShardSync(String workerId) { + return CollectionUtils.isNullOrEmpty(leaders) || leaders.contains(workerId); + } + + @Override + public synchronized Boolean isLeader(String workerId) { + // if no leaders yet, synchronously get leaders. This will happen at first Shard Sync. + if (executeConditionCheckWithReadLock(() -> CollectionUtils.isNullOrEmpty(leaders))) { + electLeaders(); + // start a scheduled executor that will periodically update leaders. + // The first run will be after a minute. + // We don't need jitter since it is scheduled with a fixed delay and time taken to scan leases + // will be different at different times and on different hosts/workers. + leaderElectionThreadPool.scheduleWithFixedDelay(this::electLeaders, ELECTION_INITIAL_DELAY_MILLIS, + ELECTION_SCHEDULING_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + } + + return executeConditionCheckWithReadLock(() -> isWorkerLeaderForShardSync(workerId)); + } + + @Override + public synchronized void shutdown() { + try { + leaderElectionThreadPool.shutdown(); + if (leaderElectionThreadPool.awaitTermination(AWAIT_TERMINATION_MILLIS, TimeUnit.MILLISECONDS)) { + log.info("Successfully stopped leader election on the worker"); + } else { + leaderElectionThreadPool.shutdownNow(); + log.info(String.format("Stopped leader election thread after awaiting termination for %d milliseconds", + AWAIT_TERMINATION_MILLIS)); + } + + } catch (InterruptedException e) { + log.debug("Encountered InterruptedException while awaiting leader election threadPool termination"); + } + } + + // Execute condition checks using shared variables under a read-write lock. + private boolean executeConditionCheckWithReadLock(BooleanSupplier action) { + try { + readWriteLock.readLock().lock(); + return action.getAsBoolean(); + } finally { + readWriteLock.readLock().unlock(); + } + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/LeaderDecider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/LeaderDecider.java new file mode 100644 index 00000000..140791af --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/LeaderDecider.java @@ -0,0 +1,39 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +/** + * Used in conjunction with periodic shard sync. + * Implement this interface to allow KCL to decide if the current worker should execute shard sync. + * When periodic shard sync is enabled, PeriodicShardSyncManager periodically checks if the current + * worker is one of the leaders designated to execute shard-sync and then acts accordingly. + */ +public interface LeaderDecider { + + /** + * Method invoked to check the given workerId corresponds to one of the workers + * designated to execute shard-syncs periodically. + * + * @param workerId ID of the worker + * @return True if the worker with ID workerId can execute shard-sync. False otherwise. + */ + Boolean isLeader(String workerId); + + /** + * Can be invoked, if needed, to shutdown any clients/thread-pools + * being used in the LeaderDecider implementation. + */ + void shutdown(); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java new file mode 100644 index 00000000..3134fef7 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -0,0 +1,94 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.Validate; +import software.amazon.kinesis.leases.ShardSyncTask; +import software.amazon.kinesis.lifecycle.ConsumerTask; +import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; +import software.amazon.kinesis.metrics.MetricsFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * The top level orchestrator for coordinating the periodic shard sync related + * activities. + */ +@Getter +@EqualsAndHashCode +@Slf4j +class PeriodicShardSyncManager { + private static final long INITIAL_DELAY = 0; + private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000; + + private final String workerId; + private final LeaderDecider leaderDecider; + private final ConsumerTask metricsEmittingShardSyncTask; + private final ScheduledExecutorService shardSyncThreadPool; + private boolean isRunning; + + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, MetricsFactory metricsFactory) { + this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory); + } + + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, MetricsFactory metricsFactory) { + Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); + Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); + Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); + this.workerId = workerId; + this.leaderDecider = leaderDecider; + this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); + this.shardSyncThreadPool = shardSyncThreadPool; + } + + public synchronized TaskResult start() { + if (!isRunning) { + shardSyncThreadPool + .scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); + isRunning = true; + } + return new TaskResult(null); + } + + public void stop() { + if (isRunning) { + log.info(String.format("Shutting down leader decider on worker %s", workerId)); + leaderDecider.shutdown(); + log.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId)); + shardSyncThreadPool.shutdown(); + isRunning = false; + } + } + + private void runShardSync() { + try { + if (leaderDecider.isLeader(workerId)) { + log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); + metricsEmittingShardSyncTask.call(); + } else { + log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); + } + } catch (Throwable t) { + log.error("Error during runShardSync.", t); + } + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index eaeb5a1c..7f92c384 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -15,20 +15,7 @@ package software.amazon.kinesis.coordinator; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - import com.google.common.annotations.VisibleForTesting; - import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; @@ -39,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -48,7 +36,6 @@ import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; -import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.lifecycle.LifecycleConfig; @@ -70,6 +57,19 @@ import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** * */ @@ -78,6 +78,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; @Slf4j public class Scheduler implements Runnable { + private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; private SchedulerLog slog = new SchedulerLog(); private final CheckpointConfig checkpointConfig; @@ -101,6 +102,7 @@ public class Scheduler implements Runnable { // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager shardSyncTaskManager; + private final PeriodicShardSyncManager periodicShardSyncManager; private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -119,6 +121,7 @@ public class Scheduler implements Runnable { private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; private final long schedulerInitializationBackoffTimeMillis; + private LeaderDecider leaderDecider; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -209,6 +212,10 @@ public class Scheduler implements Runnable { this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory() .createWorkerStateChangeListener(); } + if (leaderDecider == null) { + leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, + Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); + } this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); @@ -222,6 +229,11 @@ public class Scheduler implements Runnable { this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); + ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, + cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, + metricsFactory); + this.periodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), + leaderDecider, shardSyncTask, metricsFactory); } /** @@ -235,13 +247,17 @@ public class Scheduler implements Runnable { try { initialize(); - log.info("Initialization complete. Starting worker loop."); + log.info("Initialization complete. Scheduling periodicShardSync.."); + + periodicShardSyncManager.start(); + + log.info("Scheduled periodicShardSync tasks. Starting worker loop."); + } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e); workerStateChangeListener.onAllInitializationAttemptsFailed(e); shutdown(); } - while (!shouldShutdown()) { runProcessLoop(); } @@ -516,6 +532,7 @@ public class Scheduler implements Runnable { // Lost leases will force Worker to begin shutdown process for all shard consumers in // Worker.run(). leaseCoordinator.stop(); + periodicShardSyncManager.stop(); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 802ee29b..682a6f9e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -14,14 +14,6 @@ */ package software.amazon.kinesis.leases; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Collections2; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; @@ -30,6 +22,12 @@ import lombok.ToString; import lombok.experimental.Accessors; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + /** * This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a * fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend