This commit is contained in:
jegoshj 2020-06-29 14:40:52 +02:00 committed by GitHub
commit b059fde302
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 559 additions and 30 deletions

View file

@ -0,0 +1,152 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
/**
* An implementation of the {@code LeaderDecider} to elect leader(s) based on workerId.
* Leases are shuffled using a predetermined constant seed so that lease ordering is
* preserved across workers.
* This reduces the probability of choosing the leader workers co-located on the same
* host in case workerId starts with a common string (e.g. IP Address).
* Hence if a host has 3 workers, IPADDRESS_Worker1, IPADDRESS_Worker2, and IPADDRESS_Worker3,
* we don't end up choosing all 3 for shard sync as a result of natural ordering of Strings.
* This ensures redundancy for shard-sync during host failures.
*/
@Slf4j
class DeterministicShuffleShardSyncLeaderDecider
implements LeaderDecider {
// Fixed seed so that the shuffle order is preserved across workers
static final int DETERMINISTIC_SHUFFLE_SEED = 1947;
private static final long ELECTION_INITIAL_DELAY_MILLIS = 60 * 1000;
private static final long ELECTION_SCHEDULING_INTERVAL_MILLIS = 5 * 60 * 1000;
private static final int AWAIT_TERMINATION_MILLIS = 5000;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final LeaseRefresher leaseRefresher;
private final int numPeriodicShardSyncWorkers;
private final ScheduledExecutorService leaderElectionThreadPool;
private volatile Set<String> leaders;
/**
* @param leaseRefresher LeaseManager instance used to fetch leases.
* @param leaderElectionThreadPool Thread-pool to be used for leaderElection.
* @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs.
*/
DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher, ScheduledExecutorService leaderElectionThreadPool,
int numPeriodicShardSyncWorkers) {
this.leaseRefresher = leaseRefresher;
this.leaderElectionThreadPool = leaderElectionThreadPool;
this.numPeriodicShardSyncWorkers = numPeriodicShardSyncWorkers;
}
/*
* Shuffles the leases deterministically and elects numPeriodicShardSyncWorkers number of workers
* as leaders (workers that will perform shard sync).
*/
private void electLeaders() {
try {
log.debug("Started leader election at: " + Instant.now());
List<Lease> leases = leaseRefresher.listLeases();
List<String> uniqueHosts = leases.stream().map(Lease::leaseOwner)
.filter(owner -> owner != null).distinct().sorted().collect(Collectors.toList());
Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED));
int numShardSyncWorkers = Math.min(uniqueHosts.size(), numPeriodicShardSyncWorkers);
// In case value is currently being read, we wait for reading to complete before updating the variable.
// This is to prevent any ConcurrentModificationException exceptions.
readWriteLock.writeLock().lock();
leaders = new HashSet<>(uniqueHosts.subList(0, numShardSyncWorkers));
log.info("Elected leaders: " + String.join(", ", leaders));
log.debug("Completed leader election at: " + Instant.now());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
log.error("Exception occurred while trying to fetch all leases for leader election", e);
} catch (Throwable t) {
log.error("Unknown exception during leader election.", t);
} finally {
readWriteLock.writeLock().unlock();
}
}
private boolean isWorkerLeaderForShardSync(String workerId) {
return CollectionUtils.isNullOrEmpty(leaders) || leaders.contains(workerId);
}
@Override
public synchronized Boolean isLeader(String workerId) {
// if no leaders yet, synchronously get leaders. This will happen at first Shard Sync.
if (executeConditionCheckWithReadLock(() -> CollectionUtils.isNullOrEmpty(leaders))) {
electLeaders();
// start a scheduled executor that will periodically update leaders.
// The first run will be after a minute.
// We don't need jitter since it is scheduled with a fixed delay and time taken to scan leases
// will be different at different times and on different hosts/workers.
leaderElectionThreadPool.scheduleWithFixedDelay(this::electLeaders, ELECTION_INITIAL_DELAY_MILLIS,
ELECTION_SCHEDULING_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
return executeConditionCheckWithReadLock(() -> isWorkerLeaderForShardSync(workerId));
}
@Override
public synchronized void shutdown() {
try {
leaderElectionThreadPool.shutdown();
if (leaderElectionThreadPool.awaitTermination(AWAIT_TERMINATION_MILLIS, TimeUnit.MILLISECONDS)) {
log.info("Successfully stopped leader election on the worker");
} else {
leaderElectionThreadPool.shutdownNow();
log.info(String.format("Stopped leader election thread after awaiting termination for %d milliseconds",
AWAIT_TERMINATION_MILLIS));
}
} catch (InterruptedException e) {
log.debug("Encountered InterruptedException while awaiting leader election threadPool termination");
}
}
// Execute condition checks using shared variables under a read-write lock.
private boolean executeConditionCheckWithReadLock(BooleanSupplier action) {
try {
readWriteLock.readLock().lock();
return action.getAsBoolean();
} finally {
readWriteLock.readLock().unlock();
}
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
/**
* Used in conjunction with periodic shard sync.
* Implement this interface to allow KCL to decide if the current worker should execute shard sync.
* When periodic shard sync is enabled, PeriodicShardSyncManager periodically checks if the current
* worker is one of the leaders designated to execute shard-sync and then acts accordingly.
*/
public interface LeaderDecider {
/**
* Method invoked to check the given workerId corresponds to one of the workers
* designated to execute shard-syncs periodically.
*
* @param workerId ID of the worker
* @return True if the worker with ID workerId can execute shard-sync. False otherwise.
*/
Boolean isLeader(String workerId);
/**
* Can be invoked, if needed, to shutdown any clients/thread-pools
* being used in the LeaderDecider implementation.
*/
void shutdown();
}

View file

@ -0,0 +1,128 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The top level orchestrator for coordinating the periodic shard sync related
* activities.
*/
@Getter
@EqualsAndHashCode
@Slf4j
class PeriodicShardSyncManager {
private static final long INITIAL_DELAY = 60 * 1000L;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L;
private final String workerId;
private final LeaderDecider leaderDecider;
private final ConsumerTask metricsEmittingShardSyncTask;
private final ScheduledExecutorService shardSyncThreadPool;
private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, MetricsFactory metricsFactory) {
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory);
}
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, MetricsFactory metricsFactory) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId;
this.leaderDecider = leaderDecider;
this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory);
this.shardSyncThreadPool = shardSyncThreadPool;
}
public synchronized TaskResult start() {
if (!isRunning) {
final Runnable periodicShardSyncer = () -> {
try {
runShardSync();
} catch (Throwable t) {
log.error("Error during runShardSync.", t);
}
};
shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
isRunning = true;
}
return new TaskResult(null);
}
/**
* Runs shardSync once
* Does not schedule periodic shardSync
* @return the result of the task
*/
public synchronized TaskResult syncShardsOnce() {
Exception lastException = null;
try {
if (!isRunning) {
runShardSync();
}
} catch (Exception e) {
lastException = e;
}
return new TaskResult(lastException);
}
public void stop() {
if (isRunning) {
log.info(String.format("Shutting down leader decider on worker %s", workerId));
leaderDecider.shutdown();
log.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId));
shardSyncThreadPool.shutdown();
isRunning = false;
}
}
private void runShardSync() {
if (leaderDecider.isLeader(workerId)) {
log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId));
final TaskResult taskResult = metricsEmittingShardSyncTask.call();
if (taskResult != null && taskResult.getException() != null) {
throw new KinesisClientLibIOException("Failed to sync shards", taskResult.getException());
}
} else {
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
}
}
/**
* Checks if the entire hash range is covered
* @return true if covered, false otherwise
*/
public boolean hashRangeCovered() {
// TODO: Implement method
return true;
}
}

View file

@ -15,20 +15,7 @@
package software.amazon.kinesis.coordinator; package software.amazon.kinesis.coordinator;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.Getter; import lombok.Getter;
@ -39,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -48,9 +36,11 @@ import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument; import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
@ -59,7 +49,6 @@ import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
@ -70,6 +59,20 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/** /**
* *
*/ */
@ -78,6 +81,11 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
@Slf4j @Slf4j
public class Scheduler implements Runnable { public class Scheduler implements Runnable {
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
private SchedulerLog slog = new SchedulerLog(); private SchedulerLog slog = new SchedulerLog();
private final CheckpointConfig checkpointConfig; private final CheckpointConfig checkpointConfig;
@ -101,6 +109,7 @@ public class Scheduler implements Runnable {
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator; private final LeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager shardSyncTaskManager; private final ShardSyncTaskManager shardSyncTaskManager;
private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
private final ShardPrioritization shardPrioritization; private final ShardPrioritization shardPrioritization;
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -119,6 +128,7 @@ public class Scheduler implements Runnable {
private final AggregatorUtil aggregatorUtil; private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer; private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final long schedulerInitializationBackoffTimeMillis; private final long schedulerInitializationBackoffTimeMillis;
private final LeaderDecider leaderDecider;
// Holds consumers for shards the worker is currently tracking. Key is shard // Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer. // info, value is ShardConsumer.
@ -209,6 +219,8 @@ public class Scheduler implements Runnable {
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory() this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
.createWorkerStateChangeListener(); .createWorkerStateChangeListener();
} }
this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.initialPosition = retrievalConfig.initialPositionInStreamExtended();
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
@ -222,6 +234,7 @@ public class Scheduler implements Runnable {
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = buildPeriodicShardSyncManager();
} }
/** /**
@ -241,7 +254,6 @@ public class Scheduler implements Runnable {
workerStateChangeListener.onAllInitializationAttemptsFailed(e); workerStateChangeListener.onAllInitializationAttemptsFailed(e);
shutdown(); shutdown();
} }
while (!shouldShutdown()) { while (!shouldShutdown()) {
runProcessLoop(); runProcessLoop();
} }
@ -266,11 +278,9 @@ public class Scheduler implements Runnable {
TaskResult result = null; TaskResult result = null;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
waitUntilLeaseTableIsReady();
log.info("Syncing Kinesis shard info"); log.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, result = leaderElectedPeriodicShardSyncManager.syncShardsOnce();
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else { } else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
} }
@ -282,6 +292,10 @@ public class Scheduler implements Runnable {
} else { } else {
log.info("LeaseCoordinator is already running. No need to start it."); log.info("LeaseCoordinator is already running. No need to start it.");
} }
log.info("Scheduling periodicShardSync)");
// leaderElectedPeriodicShardSyncManager.start();
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
waitUntilHashRangeCovered();
isDone = true; isDone = true;
} else { } else {
lastException = result.getException(); lastException = result.getException();
@ -296,6 +310,7 @@ public class Scheduler implements Runnable {
if (!isDone) { if (!isDone) {
try { try {
Thread.sleep(schedulerInitializationBackoffTimeMillis); Thread.sleep(schedulerInitializationBackoffTimeMillis);
leaderElectedPeriodicShardSyncManager.stop();
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.debug("Sleep interrupted while initializing worker."); log.debug("Sleep interrupted while initializing worker.");
} }
@ -309,6 +324,29 @@ public class Scheduler implements Runnable {
} }
} }
@VisibleForTesting
void waitUntilLeaseTableIsReady() throws InterruptedException,
DependencyException, ProvisionedThroughputException, InvalidStateException {
long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
long waitUntil = System.currentTimeMillis() + waitTime;
while (System.currentTimeMillis() < waitUntil && leaseRefresher.isLeaseTableEmpty()) {
// check every 3 seconds if lease table is still empty,
// to minimize contention between all workers bootstrapping at the same time
log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
}
}
private void waitUntilHashRangeCovered() throws InterruptedException {
while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) {
// wait until entire hash range is covered
log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
}
}
@VisibleForTesting @VisibleForTesting
void runProcessLoop() { void runProcessLoop() {
try { try {
@ -516,6 +554,7 @@ public class Scheduler implements Runnable {
// Lost leases will force Worker to begin shutdown process for all shard consumers in // Lost leases will force Worker to begin shutdown process for all shard consumers in
// Worker.run(). // Worker.run().
leaseCoordinator.stop(); leaseCoordinator.stop();
leaderElectedPeriodicShardSyncManager.stop();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
} }
} }
@ -613,12 +652,20 @@ public class Scheduler implements Runnable {
hierarchicalShardSyncer, hierarchicalShardSyncer,
metricsFactory); metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(),lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
}
private PeriodicShardSyncManager buildPeriodicShardSyncManager() {
final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
return new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
leaderDecider, shardSyncTask, metricsFactory);
} }
/** /**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* * <p>
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo

View file

@ -14,14 +14,6 @@
*/ */
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Collections2;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -30,6 +22,12 @@ import lombok.ToString;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/** /**
* This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a * This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a
* fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend * fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend

View file

@ -0,0 +1,125 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED;
@RunWith(MockitoJUnitRunner.class)
public class DeterministicShuffleShardSyncLeaderDeciderTest {
private static final String LEASE_KEY = "lease_key";
private static final String LEASE_OWNER = "lease_owner";
private static final String WORKER_ID = "worker-id";
private DeterministicShuffleShardSyncLeaderDecider leaderDecider;
@Mock
private LeaseRefresher leaseRefresher;
@Mock
private ScheduledExecutorService scheduledExecutorService;
private int numShardSyncWorkers;
@Before
public void setup() {
numShardSyncWorkers = 1;
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
}
@Test
public void testLeaderElectionWithNullLeases() {
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
assertTrue("IsLeader should return true if leaders is null", isLeader);
}
@Test
public void testLeaderElectionWithEmptyLeases() throws Exception {
when(leaseRefresher.listLeases()).thenReturn(new ArrayList<>());
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
assertTrue("IsLeader should return true if no leases are returned", isLeader);
}
@Test
public void testElectedLeadersAsPerExpectedShufflingOrder()
throws Exception {
List<Lease> leases = getLeases(5, false /* duplicateLeaseOwner */, true /* activeLeases */);
when(leaseRefresher.listLeases()).thenReturn(leases);
Set<String> expectedLeaders = getExpectedLeaders(leases);
for (String leader : expectedLeaders) {
assertTrue(leaderDecider.isLeader(leader));
}
for (Lease lease : leases) {
if (!expectedLeaders.contains(lease.leaseOwner())) {
assertFalse(leaderDecider.isLeader(lease.leaseOwner()));
}
}
}
@Test
public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() {
this.numShardSyncWorkers = 5; // More than number of unique lease owners
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
List<Lease> leases = getLeases(3, false /* duplicateLeaseOwner */, true /* activeLeases */);
Set<String> expectedLeaders = getExpectedLeaders(leases);
// All lease owners should be present in expected leaders set, and they should all be leaders.
for (Lease lease : leases) {
assertTrue(leaderDecider.isLeader(lease.leaseOwner()));
assertTrue(expectedLeaders.contains(lease.leaseOwner()));
}
}
private List<Lease> getLeases(int count, boolean duplicateLeaseOwner, boolean activeLeases) {
List<Lease> leases = new ArrayList<>();
for (int i = 0; i < count; i++) {
Lease lease = new Lease();
lease.leaseKey(LEASE_KEY + i);
lease.checkpoint(activeLeases ? ExtendedSequenceNumber.LATEST : ExtendedSequenceNumber.SHARD_END);
lease.leaseCounter(new Random().nextLong());
lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i));
leases.add(lease);
}
return leases;
}
private Set<String> getExpectedLeaders(List<Lease> leases) {
List<String> uniqueHosts = leases.stream().filter(lease -> lease.leaseOwner() != null)
.map(Lease::leaseOwner).distinct().sorted().collect(Collectors.toList());
Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED));
int numWorkers = Math.min(uniqueHosts.size(), this.numShardSyncWorkers);
return new HashSet<>(uniqueHosts.subList(0, numWorkers));
}
}

View file

@ -89,6 +89,9 @@ public class SchedulerTest {
private final String applicationName = "applicationName"; private final String applicationName = "applicationName";
private final String streamName = "streamName"; private final String streamName = "streamName";
private final String namespace = "testNamespace"; private final String namespace = "testNamespace";
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private Scheduler scheduler; private Scheduler scheduler;
private ShardRecordProcessorFactory shardRecordProcessorFactory; private ShardRecordProcessorFactory shardRecordProcessorFactory;
@ -265,6 +268,43 @@ public class SchedulerTest {
verify(shardDetector, times(maxInitializationAttempts)).listShards(); verify(shardDetector, times(maxInitializationAttempts)).listShards();
} }
@Test
public final void testInitializationWaitsWhenLeaseTableIsEmpty() throws Exception {
final int maxInitializationAttempts = 1;
coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts);
coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
doNothing().when(leaseCoordinator).initialize();
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
long startTime = System.currentTimeMillis();
scheduler.waitUntilLeaseTableIsReady();
long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
assertTrue(endTime - startTime < (MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS + LEASE_TABLE_CHECK_FREQUENCY_MILLIS));
}
@Test
public final void testInitializationDoesntWaitWhenLeaseTableIsNotEmpty() throws Exception {
final int maxInitializationAttempts = 1;
coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts);
coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
doNothing().when(leaseCoordinator).initialize();
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
long startTime = System.currentTimeMillis();
scheduler.waitUntilLeaseTableIsReady();
long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
}
@Test @Test
public final void testSchedulerShutdown() { public final void testSchedulerShutdown() {
scheduler.shutdown(); scheduler.shutdown();