Merge pull request #1 from ychunxue/periodic_sync

Implementation of Periodic shard sync manager that supports multi streams
This commit is contained in:
ychunxue 2020-03-25 13:36:12 -07:00 committed by GitHub
commit f03a0fc561
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 644 additions and 44 deletions

View file

@ -0,0 +1,152 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
/**
* An implementation of the {@code LeaderDecider} to elect leader(s) based on workerId.
* Leases are shuffled using a predetermined constant seed so that lease ordering is
* preserved across workers.
* This reduces the probability of choosing the leader workers co-located on the same
* host in case workerId starts with a common string (e.g. IP Address).
* Hence if a host has 3 workers, IPADDRESS_Worker1, IPADDRESS_Worker2, and IPADDRESS_Worker3,
* we don't end up choosing all 3 for shard sync as a result of natural ordering of Strings.
* This ensures redundancy for shard-sync during host failures.
*/
@Slf4j
class DeterministicShuffleShardSyncLeaderDecider
implements LeaderDecider {
// Fixed seed so that the shuffle order is preserved across workers
static final int DETERMINISTIC_SHUFFLE_SEED = 1947;
private static final long ELECTION_INITIAL_DELAY_MILLIS = 60 * 1000;
private static final long ELECTION_SCHEDULING_INTERVAL_MILLIS = 5 * 60 * 1000;
private static final int AWAIT_TERMINATION_MILLIS = 5000;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final LeaseRefresher leaseRefresher;
private final int numPeriodicShardSyncWorkers;
private final ScheduledExecutorService leaderElectionThreadPool;
private volatile Set<String> leaders;
/**
* @param leaseRefresher LeaseManager instance used to fetch leases.
* @param leaderElectionThreadPool Thread-pool to be used for leaderElection.
* @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs.
*/
DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher, ScheduledExecutorService leaderElectionThreadPool,
int numPeriodicShardSyncWorkers) {
this.leaseRefresher = leaseRefresher;
this.leaderElectionThreadPool = leaderElectionThreadPool;
this.numPeriodicShardSyncWorkers = numPeriodicShardSyncWorkers;
}
/*
* Shuffles the leases deterministically and elects numPeriodicShardSyncWorkers number of workers
* as leaders (workers that will perform shard sync).
*/
private void electLeaders() {
try {
log.debug("Started leader election at: " + Instant.now());
List<Lease> leases = leaseRefresher.listLeases();
List<String> uniqueHosts = leases.stream().map(Lease::leaseOwner)
.filter(owner -> owner != null).distinct().sorted().collect(Collectors.toList());
Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED));
int numShardSyncWorkers = Math.min(uniqueHosts.size(), numPeriodicShardSyncWorkers);
// In case value is currently being read, we wait for reading to complete before updating the variable.
// This is to prevent any ConcurrentModificationException exceptions.
readWriteLock.writeLock().lock();
leaders = new HashSet<>(uniqueHosts.subList(0, numShardSyncWorkers));
log.info("Elected leaders: " + String.join(", ", leaders));
log.debug("Completed leader election at: " + Instant.now());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
log.error("Exception occurred while trying to fetch all leases for leader election", e);
} catch (Throwable t) {
log.error("Unknown exception during leader election.", t);
} finally {
readWriteLock.writeLock().unlock();
}
}
private boolean isWorkerLeaderForShardSync(String workerId) {
return CollectionUtils.isNullOrEmpty(leaders) || leaders.contains(workerId);
}
@Override
public synchronized Boolean isLeader(String workerId) {
// if no leaders yet, synchronously get leaders. This will happen at first Shard Sync.
if (executeConditionCheckWithReadLock(() -> CollectionUtils.isNullOrEmpty(leaders))) {
electLeaders();
// start a scheduled executor that will periodically update leaders.
// The first run will be after a minute.
// We don't need jitter since it is scheduled with a fixed delay and time taken to scan leases
// will be different at different times and on different hosts/workers.
leaderElectionThreadPool.scheduleWithFixedDelay(this::electLeaders, ELECTION_INITIAL_DELAY_MILLIS,
ELECTION_SCHEDULING_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
return executeConditionCheckWithReadLock(() -> isWorkerLeaderForShardSync(workerId));
}
@Override
public synchronized void shutdown() {
try {
leaderElectionThreadPool.shutdown();
if (leaderElectionThreadPool.awaitTermination(AWAIT_TERMINATION_MILLIS, TimeUnit.MILLISECONDS)) {
log.info("Successfully stopped leader election on the worker");
} else {
leaderElectionThreadPool.shutdownNow();
log.info(String.format("Stopped leader election thread after awaiting termination for %d milliseconds",
AWAIT_TERMINATION_MILLIS));
}
} catch (InterruptedException e) {
log.debug("Encountered InterruptedException while awaiting leader election threadPool termination");
}
}
// Execute condition checks using shared variables under a read-write lock.
private boolean executeConditionCheckWithReadLock(BooleanSupplier action) {
try {
readWriteLock.readLock().lock();
return action.getAsBoolean();
} finally {
readWriteLock.readLock().unlock();
}
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
/**
* Used in conjunction with periodic shard sync.
* Implement this interface to allow KCL to decide if the current worker should execute shard sync.
* When periodic shard sync is enabled, PeriodicShardSyncManager periodically checks if the current
* worker is one of the leaders designated to execute shard-sync and then acts accordingly.
*/
public interface LeaderDecider {
/**
* Method invoked to check the given workerId corresponds to one of the workers
* designated to execute shard-syncs periodically.
*
* @param workerId ID of the worker
* @return True if the worker with ID workerId can execute shard-sync. False otherwise.
*/
Boolean isLeader(String workerId);
/**
* Can be invoked, if needed, to shutdown any clients/thread-pools
* being used in the LeaderDecider implementation.
*/
void shutdown();
}

View file

@ -0,0 +1,129 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* The top level orchestrator for coordinating the periodic shard sync related
* activities.
*/
@Getter
@EqualsAndHashCode
@Slf4j
class PeriodicShardSyncManager {
private static final long INITIAL_DELAY = 60 * 1000L;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L;
private final String workerId;
private final LeaderDecider leaderDecider;
private final Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap;
private final ScheduledExecutorService shardSyncThreadPool;
private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap) {
this(workerId, leaderDecider, streamToShardSyncTaskManagerMap, Executors.newSingleThreadScheduledExecutor());
}
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap,
ScheduledExecutorService shardSyncThreadPool) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId;
this.leaderDecider = leaderDecider;
this.streamToShardSyncTaskManagerMap = streamToShardSyncTaskManagerMap;
this.shardSyncThreadPool = shardSyncThreadPool;
}
public synchronized TaskResult start() {
if (!isRunning) {
final Runnable periodicShardSyncer = () -> {
try {
runShardSync();
} catch (Throwable t) {
log.error("Error during runShardSync.", t);
}
};
shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
isRunning = true;
}
return new TaskResult(null);
}
/**
* Runs shardSync once
* Does not schedule periodic shardSync
* @return the result of the task
*/
public synchronized void syncShardsOnce() throws Exception {
for (Map.Entry<StreamIdentifier, ShardSyncTaskManager> mapEntry : streamToShardSyncTaskManagerMap.entrySet()) {
final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue();
final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask();
if (taskResult.getException() != null) {
throw taskResult.getException();
}
}
}
public void stop() {
if (isRunning) {
log.info(String.format("Shutting down leader decider on worker %s", workerId));
leaderDecider.shutdown();
log.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId));
shardSyncThreadPool.shutdown();
isRunning = false;
}
}
private void runShardSync() {
if (leaderDecider.isLeader(workerId)) {
for (Map.Entry<StreamIdentifier, ShardSyncTaskManager> mapEntry : streamToShardSyncTaskManagerMap.entrySet()) {
final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue();
if (!shardSyncTaskManager.syncShardAndLeaseInfo()) {
log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
}
}
} else {
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
}
}
/**
* Checks if the entire hash range is covered
* @return true if covered, false otherwise
*/
public boolean hashRangeCovered() {
// TODO: Implement method
return true;
}
}

View file

@ -34,7 +34,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
@ -45,6 +44,8 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
@ -55,22 +56,21 @@ import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
@ -81,6 +81,9 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
/**
*
*/
@ -89,6 +92,11 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
@Slf4j
public class Scheduler implements Runnable {
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
private SchedulerLog slog = new SchedulerLog();
private final CheckpointConfig checkpointConfig;
@ -113,6 +121,7 @@ public class Scheduler implements Runnable {
private final LeaseCoordinator leaseCoordinator;
private final Function<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
private final ShardPrioritization shardPrioritization;
private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -131,6 +140,7 @@ public class Scheduler implements Runnable {
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final long schedulerInitializationBackoffTimeMillis;
private final LeaderDecider leaderDecider;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@ -236,6 +246,8 @@ public class Scheduler implements Runnable {
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
.createWorkerStateChangeListener();
}
this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();
@ -248,6 +260,8 @@ public class Scheduler implements Runnable {
// TODO : Halo : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
leaderDecider, streamToShardSyncTaskManagerMap);
}
/**
@ -267,7 +281,6 @@ public class Scheduler implements Runnable {
workerStateChangeListener.onAllInitializationAttemptsFailed(e);
shutdown();
}
while (!shouldShutdown()) {
runProcessLoop();
}
@ -290,24 +303,17 @@ public class Scheduler implements Runnable {
log.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize();
TaskResult result;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams
for(Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
log.info("Syncing Kinesis shard info for " + streamIdentifier);
final StreamConfig streamConfig = streamConfigEntry.getValue();
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier),
leaseRefresher, streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
hierarchicalShardSyncer, metricsFactory);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
// Throwing the exception, to prevent further syncs for other stream.
if (result.getException() != null) {
log.error("Caught exception when sync'ing info for " + streamIdentifier, result.getException());
throw result.getException();
if (shouldInitiateLeaseSync()) {
log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier());
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
createOrGetShardSyncTaskManager(streamIdentifier);
log.info("Creating shard sync task for " + streamIdentifier);
}
leaderElectedPeriodicShardSyncManager.syncShardsOnce();
}
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
@ -321,6 +327,11 @@ public class Scheduler implements Runnable {
} else {
log.info("LeaseCoordinator is already running. No need to start it.");
}
log.info("Scheduling periodicShardSync)");
// leaderElectedPeriodicShardSyncManager.start(shardSyncTasks);
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
// TODO: Determine if waitUntilHashRangeCovered() is needed.
//waitUntilHashRangeCovered();
isDone = true;
} catch (LeasingException e) {
log.error("Caught exception when initializing LeaseCoordinator", e);
@ -332,6 +343,7 @@ public class Scheduler implements Runnable {
if (!isDone) {
try {
Thread.sleep(schedulerInitializationBackoffTimeMillis);
leaderElectedPeriodicShardSyncManager.stop();
} catch (InterruptedException e) {
log.debug("Sleep interrupted while initializing worker.");
}
@ -345,6 +357,34 @@ public class Scheduler implements Runnable {
}
}
@VisibleForTesting
boolean shouldInitiateLeaseSync() throws InterruptedException,
DependencyException, ProvisionedThroughputException, InvalidStateException {
long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
long waitUntil = System.currentTimeMillis() + waitTime;
boolean shouldInitiateLeaseSync = true;
while (System.currentTimeMillis() < waitUntil && (shouldInitiateLeaseSync = leaseRefresher.isLeaseTableEmpty())) {
// check every 3 seconds if lease table is still empty,
// to minimize contention between all workers bootstrapping at the same time
log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
}
return shouldInitiateLeaseSync;
}
private void waitUntilHashRangeCovered() throws InterruptedException {
// TODO: Currently this call is not in use. We may need to implement this method later. Created SIM to track the work: https://sim.amazon.com/issues/KinesisLTR-202
// TODO: For future implementation, streamToShardSyncTaskManagerMap might not contain the most up to date snapshot of active streams.
// Should use currentStreamConfigMap to determine the streams to check.
while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) {
// wait until entire hash range is covered
log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
}
}
@VisibleForTesting
void runProcessLoop() {
try {
@ -555,6 +595,7 @@ public class Scheduler implements Runnable {
// Lost leases will force Worker to begin shutdown process for all shard consumers in
// Worker.run().
leaseCoordinator.stop();
leaderElectedPeriodicShardSyncManager.stop();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
}
}
@ -662,12 +703,12 @@ public class Scheduler implements Runnable {
hierarchicalShardSyncer,
metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(),lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
}
/**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
*
* <p>
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo

View file

@ -14,14 +14,6 @@
*/
package software.amazon.kinesis.leases;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Collections2;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@ -30,6 +22,12 @@ import lombok.ToString;
import lombok.experimental.Accessors;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a
* fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend

View file

@ -23,6 +23,7 @@ import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
@ -62,6 +63,7 @@ public class ShardSyncTask implements ConsumerTask {
public TaskResult call() {
Exception exception = null;
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);
boolean shardSyncSuccess = true;
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
@ -72,7 +74,12 @@ public class ShardSyncTask implements ConsumerTask {
} catch (Exception e) {
log.error("Caught exception while sync'ing Kinesis shards and leases", e);
exception = e;
shardSyncSuccess = false;
} finally {
// NOTE: This metric is reflecting if a shard sync task succeeds. Customer can use this metric to monitor if
// their application encounter any shard sync failures. This metric can help to detect potential shard stuck issues
// that are due to shard sync failures.
MetricsUtil.addSuccess(scope, "SyncShards", shardSyncSuccess, MetricsLevel.DETAILED);
MetricsUtil.endScope(scope);
}

View file

@ -14,9 +14,12 @@
*/
package software.amazon.kinesis.leases;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -53,6 +56,10 @@ public class ShardSyncTaskManager {
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;
private ConsumerTask currentTask;
private CompletableFuture<TaskResult> future;
private AtomicBoolean shardSyncRequestPending;
private final ReentrantLock lock;
/**
* Constructor.
@ -82,6 +89,8 @@ public class ShardSyncTaskManager {
this.executorService = executorService;
this.hierarchicalShardSyncer = new HierarchicalShardSyncer();
this.metricsFactory = metricsFactory;
this.shardSyncRequestPending = new AtomicBoolean(false);
this.lock = new ReentrantLock();
}
/**
@ -110,16 +119,33 @@ public class ShardSyncTaskManager {
this.executorService = executorService;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.metricsFactory = metricsFactory;
this.shardSyncRequestPending = new AtomicBoolean(false);
this.lock = new ReentrantLock();
}
private ConsumerTask currentTask;
private Future<TaskResult> future;
public synchronized boolean syncShardAndLeaseInfo() {
return checkAndSubmitNextTask();
public TaskResult executeShardSyncTask() {
final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector,
leaseRefresher,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
hierarchicalShardSyncer,
metricsFactory);
final ConsumerTask metricCollectingTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory);
return metricCollectingTask.call();
}
private synchronized boolean checkAndSubmitNextTask() {
public boolean syncShardAndLeaseInfo() {
try {
lock.lock();
return checkAndSubmitNextTask();
} finally {
lock.unlock();
}
}
private boolean checkAndSubmitNextTask() {
boolean submittedNewTask = false;
if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) {
@ -145,18 +171,45 @@ public class ShardSyncTaskManager {
hierarchicalShardSyncer,
metricsFactory),
metricsFactory);
future = executorService.submit(currentTask);
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
.whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult));
submittedNewTask = true;
if (log.isDebugEnabled()) {
log.debug("Submitted new {} task.", currentTask.taskType());
}
} else {
if (log.isDebugEnabled()) {
log.debug("Previous {} task still pending. Not submitting new task.", currentTask.taskType());
log.debug("Previous {} task still pending. Not submitting new task. "
+ "Enqueued a request that will be executed when the current request completes.", currentTask.taskType());
}
shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/);
}
return submittedNewTask;
}
private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) {
if (exception != null || taskResult.getException() != null) {
log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException());
}
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the
// previous task is in this completion stage, checkAndSubmitNextTask is not invoked
// until this completionStage exits.
try {
lock.lock();
if (shardSyncRequestPending.get()) {
shardSyncRequestPending.set(false);
// reset future to null, so next call creates a new one
// without trying to get results from the old future.
future = null;
checkAndSubmitNextTask();
}
} finally {
lock.unlock();
}
}
}

View file

@ -0,0 +1,136 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.booleanThat;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED;
@RunWith(MockitoJUnitRunner.class)
public class DeterministicShuffleShardSyncLeaderDeciderTest {
private static final String LEASE_KEY = "lease_key";
private static final String LEASE_OWNER = "lease_owner";
private static final String WORKER_ID = "worker-id";
private DeterministicShuffleShardSyncLeaderDecider leaderDecider;
@Mock
private LeaseRefresher leaseRefresher;
@Mock
private ScheduledExecutorService scheduledExecutorService;
private int numShardSyncWorkers;
@Before
public void setup() {
numShardSyncWorkers = 1;
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
}
@Test
public void testLeaderElectionWithNullLeases() {
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
assertTrue("IsLeader should return true if leaders is null", isLeader);
}
@Test
public void testLeaderElectionWithEmptyLeases() throws Exception {
when(leaseRefresher.listLeases()).thenReturn(new ArrayList<>());
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
assertTrue("IsLeader should return true if no leases are returned", isLeader);
}
@Test
public void testleaderElectionWithEmptyOwnerLeases() throws Exception {
List<Lease> leases = getLeases(5, true, true, true);
when(leaseRefresher.listLeases()).thenReturn(leases);
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
assertTrue("IsLeader should return true if leases have no owner", isLeader);
}
@Test
public void testElectedLeadersAsPerExpectedShufflingOrder()
throws Exception {
List<Lease> leases = getLeases(5, false /*emptyLeaseOwner */,false /* duplicateLeaseOwner */, true /* activeLeases */);
when(leaseRefresher.listLeases()).thenReturn(leases);
Set<String> expectedLeaders = getExpectedLeaders(leases);
for (String leader : expectedLeaders) {
assertTrue(leaderDecider.isLeader(leader));
}
for (Lease lease : leases) {
if (!expectedLeaders.contains(lease.leaseOwner())) {
assertFalse(leaderDecider.isLeader(lease.leaseOwner()));
}
}
}
@Test
public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() {
this.numShardSyncWorkers = 5; // More than number of unique lease owners
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
List<Lease> leases = getLeases(3, false /*emptyLeaseOwner */, false /* duplicateLeaseOwner */, true /* activeLeases */);
Set<String> expectedLeaders = getExpectedLeaders(leases);
// All lease owners should be present in expected leaders set, and they should all be leaders.
for (Lease lease : leases) {
assertTrue(leaderDecider.isLeader(lease.leaseOwner()));
assertTrue(expectedLeaders.contains(lease.leaseOwner()));
}
}
private List<Lease> getLeases(int count, boolean emptyLeaseOwner, boolean duplicateLeaseOwner, boolean activeLeases) {
List<Lease> leases = new ArrayList<>();
for (int i = 0; i < count; i++) {
Lease lease = new Lease();
lease.leaseKey(LEASE_KEY + i);
lease.checkpoint(activeLeases ? ExtendedSequenceNumber.LATEST : ExtendedSequenceNumber.SHARD_END);
lease.leaseCounter(new Random().nextLong());
if (!emptyLeaseOwner) {
lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i));
}
leases.add(lease);
}
return leases;
}
private Set<String> getExpectedLeaders(List<Lease> leases) {
List<String> uniqueHosts = leases.stream().filter(lease -> lease.leaseOwner() != null)
.map(Lease::leaseOwner).distinct().sorted().collect(Collectors.toList());
Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED));
int numWorkers = Math.min(uniqueHosts.size(), this.numShardSyncWorkers);
return new HashSet<>(uniqueHosts.subList(0, numWorkers));
}
}

View file

@ -78,6 +78,7 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@ -105,6 +106,9 @@ public class SchedulerTest {
private final String applicationName = "applicationName";
private final String streamName = "streamName";
private final String namespace = "testNamespace";
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private Scheduler scheduler;
private ShardRecordProcessorFactory shardRecordProcessorFactory;
@ -172,6 +176,7 @@ public class SchedulerTest {
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null));
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
@ -275,14 +280,14 @@ public class SchedulerTest {
@Test
public final void testInitializationFailureWithRetries() throws Exception {
doNothing().when(leaseCoordinator).initialize();
when(shardDetector.listShards()).thenThrow(new RuntimeException());
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenThrow(new RuntimeException());
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.run();
verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards();
verify(dynamoDBLeaseRefresher, times(coordinatorConfig.maxInitializationAttempts())).isLeaseTableEmpty();
}
@Test
@ -295,18 +300,20 @@ public class SchedulerTest {
metricsConfig, processorConfig, retrievalConfig);
doNothing().when(leaseCoordinator).initialize();
when(shardDetector.listShards()).thenThrow(new RuntimeException());
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenThrow(new RuntimeException());
scheduler.run();
// verify initialization was retried for maxInitializationAttempts times
verify(shardDetector, times(maxInitializationAttempts)).listShards();
verify(dynamoDBLeaseRefresher, times(maxInitializationAttempts)).isLeaseTableEmpty();
}
@Test
public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.initialize();
@ -319,7 +326,7 @@ public class SchedulerTest {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, false));
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.initialize();
@ -373,6 +380,43 @@ public class SchedulerTest {
}
@Test
public final void testInitializationWaitsWhenLeaseTableIsEmpty() throws Exception {
final int maxInitializationAttempts = 1;
coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts);
coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
doNothing().when(leaseCoordinator).initialize();
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
long startTime = System.currentTimeMillis();
scheduler.shouldInitiateLeaseSync();
long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
assertTrue(endTime - startTime < (MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS + LEASE_TABLE_CHECK_FREQUENCY_MILLIS));
}
@Test
public final void testInitializationDoesntWaitWhenLeaseTableIsNotEmpty() throws Exception {
final int maxInitializationAttempts = 1;
coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts);
coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
doNothing().when(leaseCoordinator).initialize();
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
long startTime = System.currentTimeMillis();
scheduler.shouldInitiateLeaseSync();
long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
}
@Test
public final void testSchedulerShutdown() {
scheduler.shutdown();
@ -641,6 +685,7 @@ public class SchedulerTest {
shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager);
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null));
if(shardSyncFirstAttemptFailure) {
when(shardDetector.listShards())
.thenThrow(new RuntimeException("Service Exception"))