Introduce PeriodicShardSync with leader election

This commit is contained in:
Jegosh John 2020-02-25 10:41:34 -08:00
parent ab572a9378
commit 62e13ff3a1
5 changed files with 324 additions and 24 deletions

View file

@ -0,0 +1,152 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
/**
* An implementation of the {@code LeaderDecider} to elect leader(s) based on workerId.
* Leases are shuffled using a predetermined constant seed so that lease ordering is
* preserved across workers.
* This reduces the probability of choosing the leader workers co-located on the same
* host in case workerId starts with a common string (e.g. IP Address).
* Hence if a host has 3 workers, IPADDRESS_Worker1, IPADDRESS_Worker2, and IPADDRESS_Worker3,
* we don't end up choosing all 3 for shard sync as a result of natural ordering of Strings.
* This ensures redundancy for shard-sync during host failures.
*/
@Slf4j
class DeterministicShuffleShardSyncLeaderDecider
implements LeaderDecider {
// Fixed seed so that the shuffle order is preserved across workers
static final int DETERMINISTIC_SHUFFLE_SEED = 1947;
private static final long ELECTION_INITIAL_DELAY_MILLIS = 60 * 1000;
private static final long ELECTION_SCHEDULING_INTERVAL_MILLIS = 5 * 60 * 1000;
private static final int AWAIT_TERMINATION_MILLIS = 5000;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final LeaseRefresher leaseRefresher;
private final int numPeriodicShardSyncWorkers;
private final ScheduledExecutorService leaderElectionThreadPool;
private volatile Set<String> leaders;
/**
* @param leaseRefresher LeaseManager instance used to fetch leases.
* @param leaderElectionThreadPool Thread-pool to be used for leaderElection.
* @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs.
*/
DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher, ScheduledExecutorService leaderElectionThreadPool,
int numPeriodicShardSyncWorkers) {
this.leaseRefresher = leaseRefresher;
this.leaderElectionThreadPool = leaderElectionThreadPool;
this.numPeriodicShardSyncWorkers = numPeriodicShardSyncWorkers;
}
/*
* Shuffles the leases deterministically and elects numPeriodicShardSyncWorkers number of workers
* as leaders (workers that will perform shard sync).
*/
private void electLeaders() {
try {
log.debug("Started leader election at: " + Instant.now());
List<Lease> leases = leaseRefresher.listLeases();
List<String> uniqueHosts = leases.stream().map(Lease::leaseOwner)
.filter(owner -> owner != null).distinct().sorted().collect(Collectors.toList());
Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED));
int numShardSyncWorkers = Math.min(uniqueHosts.size(), numPeriodicShardSyncWorkers);
// In case value is currently being read, we wait for reading to complete before updating the variable.
// This is to prevent any ConcurrentModificationException exceptions.
readWriteLock.writeLock().lock();
leaders = new HashSet<>(uniqueHosts.subList(0, numShardSyncWorkers));
log.info("Elected leaders: " + String.join(", ", leaders));
log.debug("Completed leader election at: " + Instant.now());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
log.error("Exception occurred while trying to fetch all leases for leader election", e);
} catch (Throwable t) {
log.error("Unknown exception during leader election.", t);
} finally {
readWriteLock.writeLock().unlock();
}
}
private boolean isWorkerLeaderForShardSync(String workerId) {
return CollectionUtils.isNullOrEmpty(leaders) || leaders.contains(workerId);
}
@Override
public synchronized Boolean isLeader(String workerId) {
// if no leaders yet, synchronously get leaders. This will happen at first Shard Sync.
if (executeConditionCheckWithReadLock(() -> CollectionUtils.isNullOrEmpty(leaders))) {
electLeaders();
// start a scheduled executor that will periodically update leaders.
// The first run will be after a minute.
// We don't need jitter since it is scheduled with a fixed delay and time taken to scan leases
// will be different at different times and on different hosts/workers.
leaderElectionThreadPool.scheduleWithFixedDelay(this::electLeaders, ELECTION_INITIAL_DELAY_MILLIS,
ELECTION_SCHEDULING_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
return executeConditionCheckWithReadLock(() -> isWorkerLeaderForShardSync(workerId));
}
@Override
public synchronized void shutdown() {
try {
leaderElectionThreadPool.shutdown();
if (leaderElectionThreadPool.awaitTermination(AWAIT_TERMINATION_MILLIS, TimeUnit.MILLISECONDS)) {
log.info("Successfully stopped leader election on the worker");
} else {
leaderElectionThreadPool.shutdownNow();
log.info(String.format("Stopped leader election thread after awaiting termination for %d milliseconds",
AWAIT_TERMINATION_MILLIS));
}
} catch (InterruptedException e) {
log.debug("Encountered InterruptedException while awaiting leader election threadPool termination");
}
}
// Execute condition checks using shared variables under a read-write lock.
private boolean executeConditionCheckWithReadLock(BooleanSupplier action) {
try {
readWriteLock.readLock().lock();
return action.getAsBoolean();
} finally {
readWriteLock.readLock().unlock();
}
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
/**
* Used in conjunction with periodic shard sync.
* Implement this interface to allow KCL to decide if the current worker should execute shard sync.
* When periodic shard sync is enabled, PeriodicShardSyncManager periodically checks if the current
* worker is one of the leaders designated to execute shard-sync and then acts accordingly.
*/
public interface LeaderDecider {
/**
* Method invoked to check the given workerId corresponds to one of the workers
* designated to execute shard-syncs periodically.
*
* @param workerId ID of the worker
* @return True if the worker with ID workerId can execute shard-sync. False otherwise.
*/
Boolean isLeader(String workerId);
/**
* Can be invoked, if needed, to shutdown any clients/thread-pools
* being used in the LeaderDecider implementation.
*/
void shutdown();
}

View file

@ -0,0 +1,94 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The top level orchestrator for coordinating the periodic shard sync related
* activities.
*/
@Getter
@EqualsAndHashCode
@Slf4j
class PeriodicShardSyncManager {
private static final long INITIAL_DELAY = 0;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000;
private final String workerId;
private final LeaderDecider leaderDecider;
private final ConsumerTask metricsEmittingShardSyncTask;
private final ScheduledExecutorService shardSyncThreadPool;
private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, MetricsFactory metricsFactory) {
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory);
}
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, MetricsFactory metricsFactory) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId;
this.leaderDecider = leaderDecider;
this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory);
this.shardSyncThreadPool = shardSyncThreadPool;
}
public synchronized TaskResult start() {
if (!isRunning) {
shardSyncThreadPool
.scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
isRunning = true;
}
return new TaskResult(null);
}
public void stop() {
if (isRunning) {
log.info(String.format("Shutting down leader decider on worker %s", workerId));
leaderDecider.shutdown();
log.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId));
shardSyncThreadPool.shutdown();
isRunning = false;
}
}
private void runShardSync() {
try {
if (leaderDecider.isLeader(workerId)) {
log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId));
metricsEmittingShardSyncTask.call();
} else {
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
}
} catch (Throwable t) {
log.error("Error during runShardSync.", t);
}
}
}

View file

@ -15,20 +15,7 @@
package software.amazon.kinesis.coordinator;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
@ -39,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -48,7 +36,6 @@ import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
@ -70,6 +57,19 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
*
*/
@ -78,6 +78,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
@Slf4j
public class Scheduler implements Runnable {
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
private SchedulerLog slog = new SchedulerLog();
private final CheckpointConfig checkpointConfig;
@ -101,6 +102,7 @@ public class Scheduler implements Runnable {
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager shardSyncTaskManager;
private final PeriodicShardSyncManager periodicShardSyncManager;
private final ShardPrioritization shardPrioritization;
private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -119,6 +121,7 @@ public class Scheduler implements Runnable {
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final long schedulerInitializationBackoffTimeMillis;
private LeaderDecider leaderDecider;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@ -209,6 +212,10 @@ public class Scheduler implements Runnable {
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
.createWorkerStateChangeListener();
}
if (leaderDecider == null) {
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
}
this.initialPosition = retrievalConfig.initialPositionInStreamExtended();
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
@ -222,6 +229,11 @@ public class Scheduler implements Runnable {
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
this.periodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
leaderDecider, shardSyncTask, metricsFactory);
}
/**
@ -235,13 +247,17 @@ public class Scheduler implements Runnable {
try {
initialize();
log.info("Initialization complete. Starting worker loop.");
log.info("Initialization complete. Scheduling periodicShardSync..");
periodicShardSyncManager.start();
log.info("Scheduled periodicShardSync tasks. Starting worker loop.");
} catch (RuntimeException e) {
log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e);
workerStateChangeListener.onAllInitializationAttemptsFailed(e);
shutdown();
}
while (!shouldShutdown()) {
runProcessLoop();
}
@ -516,6 +532,7 @@ public class Scheduler implements Runnable {
// Lost leases will force Worker to begin shutdown process for all shard consumers in
// Worker.run().
leaseCoordinator.stop();
periodicShardSyncManager.stop();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
}
}

View file

@ -14,14 +14,6 @@
*/
package software.amazon.kinesis.leases;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Collections2;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@ -30,6 +22,12 @@ import lombok.ToString;
import lombok.experimental.Accessors;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a
* fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend