Changes for partial lease table hole detection, missing hash range detection and recovery

This commit is contained in:
Ashwin Giridharan 2020-05-29 09:10:14 -07:00
commit f2ba3bcd2f
19 changed files with 1111 additions and 99 deletions

View file

@ -32,6 +32,13 @@ public class HashKeyRangeForLease {
private final BigInteger startingHashKey; private final BigInteger startingHashKey;
private final BigInteger endingHashKey; private final BigInteger endingHashKey;
public HashKeyRangeForLease(BigInteger startingHashKey, BigInteger endingHashKey) {
Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0,
"StartingHashKey %s must be less than EndingHashKey %s ", startingHashKey, endingHashKey);
this.startingHashKey = startingHashKey;
this.endingHashKey = endingHashKey;
}
/** /**
* Serialize the startingHashKey for persisting in external storage * Serialize the startingHashKey for persisting in external storage
* *

View file

@ -14,24 +14,49 @@
*/ */
package software.amazon.kinesis.coordinator; package software.amazon.kinesis.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.NonNull;
import lombok.Value;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskResult;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors;
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;
/** /**
* The top level orchestrator for coordinating the periodic shard sync related * The top level orchestrator for coordinating the periodic shard sync related
@ -42,29 +67,44 @@ import java.util.function.Function;
@Slf4j @Slf4j
class PeriodicShardSyncManager { class PeriodicShardSyncManager {
private static final long INITIAL_DELAY = 60 * 1000L; private static final long INITIAL_DELAY = 60 * 1000L;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
@VisibleForTesting
static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
@VisibleForTesting
static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
@VisibleForTesting
static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3;
private Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>();
private final String workerId; private final String workerId;
private final LeaderDecider leaderDecider; private final LeaderDecider leaderDecider;
private final LeaseRefresher leaseRefresher;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap; private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider; private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final ScheduledExecutorService shardSyncThreadPool; private final ScheduledExecutorService shardSyncThreadPool;
private final boolean isMultiStreamingMode;
private boolean isRunning; private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, StreamConfig> currentStreamConfigMap, PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider) { Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
this(workerId, leaderDecider, currentStreamConfigMap, shardSyncTaskManagerProvider, Executors.newSingleThreadScheduledExecutor()); Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, boolean isMultiStreamingMode) {
this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider,
Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode);
} }
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, StreamConfig> currentStreamConfigMap, PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, ScheduledExecutorService shardSyncThreadPool) { Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider,
ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId; this.workerId = workerId;
this.leaderDecider = leaderDecider; this.leaderDecider = leaderDecider;
this.leaseRefresher = leaseRefresher;
this.currentStreamConfigMap = currentStreamConfigMap; this.currentStreamConfigMap = currentStreamConfigMap;
this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider; this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider;
this.shardSyncThreadPool = shardSyncThreadPool; this.shardSyncThreadPool = shardSyncThreadPool;
this.isMultiStreamingMode = isMultiStreamingMode;
} }
public synchronized TaskResult start() { public synchronized TaskResult start() {
@ -97,7 +137,7 @@ class PeriodicShardSyncManager {
log.info("Syncing Kinesis shard info for " + streamIdentifier); log.info("Syncing Kinesis shard info for " + streamIdentifier);
final StreamConfig streamConfig = streamConfigEntry.getValue(); final StreamConfig streamConfig = streamConfigEntry.getValue();
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig); final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig);
final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask(); final TaskResult taskResult = shardSyncTaskManager.callShardSyncTask();
if (taskResult.getException() != null) { if (taskResult.getException() != null) {
throw taskResult.getException(); throw taskResult.getException();
} }
@ -116,24 +156,248 @@ class PeriodicShardSyncManager {
private void runShardSync() { private void runShardSync() {
if (leaderDecider.isLeader(workerId)) { if (leaderDecider.isLeader(workerId)) {
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) { log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId));
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); try {
if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { // Construct the stream to leases map to be used in the lease sync
log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(
shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); currentStreamConfigMap.keySet());
// For each of the stream, check if shard sync needs to be done based on the leases state.
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
streamToLeasesMap.get(streamConfigEntry.getKey()));
if (shardSyncResponse.shouldDoShardSync()) {
log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ",
streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision());
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider
.apply(streamConfigEntry.getValue());
if (!shardSyncTaskManager.submitShardSyncTask()) {
log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
}
} else {
log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(),
shardSyncResponse.reasonForDecision());
}
} }
} catch (Exception e) {
log.error("Caught exception while running periodic shard syncer.", e);
} }
} else { } else {
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); log.debug("WorkerId {} is not a leader, not running the shard sync task", workerId);
}
}
private Map<StreamIdentifier, List<Lease>> getStreamToLeasesMap(
final Set<StreamIdentifier> streamIdentifiersToFilter)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final List<Lease> leases = leaseRefresher.listLeases();
if (!isMultiStreamingMode) {
Validate.isTrue(streamIdentifiersToFilter.size() == 1);
return Collections.singletonMap(streamIdentifiersToFilter.iterator().next(), leases);
} else {
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = new HashMap<>();
for (Lease lease : leases) {
StreamIdentifier streamIdentifier = StreamIdentifier
.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier());
if (streamIdentifiersToFilter.contains(streamIdentifier)) {
streamToLeasesMap.computeIfAbsent(streamIdentifier, s -> new ArrayList<>()).add(lease);
}
}
return streamToLeasesMap;
}
}
@VisibleForTesting
ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Lease> leases) {
if (CollectionUtils.isNullOrEmpty(leases)) {
// If the leases is null or empty then we need to do shard sync
log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier);
return new ShardSyncResponse(true, "No leases found for " + streamIdentifier);
}
// Check if there are any holes in the leases and return the first hole if present.
Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases);
if (hashRangeHoleOpt.isPresent()) {
// If hole is present, check if the hole is detected consecutively in previous occurrences.
// If hole is determined with high confidence return true; return false otherwise
// We are using the high confidence factor to avoid shard sync on any holes during resharding and
// lease cleanups or any intermittent issues.
final HashRangeHoleTracker hashRangeHoleTracker = hashRangeHoleTrackerMap
.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker());
final boolean hasHoleWithHighConfidence = hashRangeHoleTracker
.hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
return new ShardSyncResponse(hasHoleWithHighConfidence,
"Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles()
+ " times. Shard sync will be initiated when threshold reaches "
+ CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY);
} else {
// If hole is not present, clear any previous tracking for this stream and return false;
hashRangeHoleTrackerMap.remove(streamIdentifier);
return new ShardSyncResponse(false, "Hash Ranges are complete for " + streamIdentifier);
}
}
@Value
@Accessors(fluent = true)
@VisibleForTesting
static class ShardSyncResponse {
private final boolean shouldDoShardSync;
private final String reasonForDecision;
}
@VisibleForTesting
Optional<HashRangeHole> hasHoleInLeases(StreamIdentifier streamIdentifier, List<Lease> leases) {
// Filter the leases with any checkpoint other than shard end.
List<Lease> activeLeases = leases.stream()
.filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList());
List<Lease> activeLeasesWithHashRanges = fillWithHashRangesIfRequired(streamIdentifier, activeLeases);
return checkForHoleInHashKeyRanges(streamIdentifier, activeLeasesWithHashRanges);
}
// If leases are missing hashranges information, update the leases in-memory as well as in the lease storage
// by learning from kinesis shards.
private List<Lease> fillWithHashRangesIfRequired(StreamIdentifier streamIdentifier, List<Lease> activeLeases) {
List<Lease> activeLeasesWithNoHashRanges = activeLeases.stream()
.filter(lease -> lease.hashKeyRangeForLease() == null).collect(Collectors.toList());
Optional<Lease> minLeaseOpt = activeLeasesWithNoHashRanges.stream().min(Comparator.comparing(Lease::leaseKey));
if (minLeaseOpt.isPresent()) {
// TODO : use minLease for new ListShards with startingShardId
final Lease minLease = minLeaseOpt.get();
final ShardDetector shardDetector = shardSyncTaskManagerProvider
.apply(currentStreamConfigMap.get(streamIdentifier)).shardDetector();
final Map<String, Shard> kinesisShards = shardDetector.listShards().stream()
.collect(Collectors.toMap(Shard::shardId, shard -> shard));
return activeLeases.stream().map(lease -> {
if (lease.hashKeyRangeForLease() == null) {
final String shardId = lease instanceof MultiStreamLease ?
((MultiStreamLease) lease).shardId() :
lease.leaseKey();
final Shard shard = kinesisShards.get(shardId);
if(shard == null) {
return lease;
}
lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange()));
try {
leaseRefresher.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
} catch (Exception e) {
log.warn(
"Unable to update hash range key information for lease {} of stream {}. This may result in explicit lease sync.",
lease.leaseKey(), streamIdentifier);
}
}
return lease;
}).filter(lease -> lease.hashKeyRangeForLease() != null).collect(Collectors.toList());
} else {
return activeLeases;
}
}
@VisibleForTesting
static Optional<HashRangeHole> checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier,
List<Lease> leasesWithHashKeyRanges) {
// Sort the hash ranges by starting hash key.
List<Lease> sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges);
if(sortedLeasesWithHashKeyRanges.isEmpty()) {
log.error("No leases with valid hashranges found for stream {}", streamIdentifier);
return Optional.of(new HashRangeHole());
}
// Validate for hashranges bounds.
if (!sortedLeasesWithHashKeyRanges.get(0).hashKeyRangeForLease().startingHashKey().equals(MIN_HASH_KEY) || !sortedLeasesWithHashKeyRanges
.get(sortedLeasesWithHashKeyRanges.size() - 1).hashKeyRangeForLease().endingHashKey().equals(MAX_HASH_KEY)) {
log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier,
sortedLeasesWithHashKeyRanges.get(0),
sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1));
return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0).hashKeyRangeForLease(),
sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1).hashKeyRangeForLease()));
}
// Check for any holes in the sorted hashrange intervals.
if (sortedLeasesWithHashKeyRanges.size() > 1) {
Lease leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(0);
HashKeyRangeForLease leftLeaseHashRange = leftMostLeaseToReportInCaseOfHole.hashKeyRangeForLease();
for (int i = 1; i < sortedLeasesWithHashKeyRanges.size(); i++) {
final HashKeyRangeForLease rightLeaseHashRange = sortedLeasesWithHashKeyRanges.get(i).hashKeyRangeForLease();
final BigInteger rangeDiff = rightLeaseHashRange.startingHashKey().subtract(leftLeaseHashRange.endingHashKey());
// Case of overlapping leases when the rangediff is 0 or negative.
// signum() will be -1 for negative and 0 if value is 0.
// Merge the range for further tracking.
if (rangeDiff.signum() <= 0) {
leftLeaseHashRange = new HashKeyRangeForLease(leftLeaseHashRange.startingHashKey(),
leftLeaseHashRange.endingHashKey().max(rightLeaseHashRange.endingHashKey()));
} else {
// Case of non overlapping leases when rangediff is positive. signum() will be 1 for positive.
// If rangeDiff is 1, then it is a case of continuous hashrange. If not, it is a hole.
if (!rangeDiff.equals(BigInteger.ONE)) {
log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier,
leftMostLeaseToReportInCaseOfHole, sortedLeasesWithHashKeyRanges.get(i));
return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole.hashKeyRangeForLease(),
sortedLeasesWithHashKeyRanges.get(i).hashKeyRangeForLease()));
}
leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(i);
leftLeaseHashRange = rightLeaseHashRange;
}
}
}
return Optional.empty();
}
@VisibleForTesting
static List<Lease> sortLeasesByHashRange(List<Lease> leasesWithHashKeyRanges) {
if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1)
return leasesWithHashKeyRanges;
Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator());
return leasesWithHashKeyRanges;
}
@Value
private static class HashRangeHole {
HashRangeHole() {
hashRangeAtStartOfPossibleHole = hashRangeAtEndOfPossibleHole = null;
}
HashRangeHole(HashKeyRangeForLease hashRangeAtStartOfPossibleHole, HashKeyRangeForLease hashRangeAtEndOfPossibleHole) {
this.hashRangeAtStartOfPossibleHole = hashRangeAtStartOfPossibleHole;
this.hashRangeAtEndOfPossibleHole = hashRangeAtEndOfPossibleHole;
}
private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole;
private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole;
}
private static class HashRangeHoleTracker {
private HashRangeHole hashRangeHole;
@Getter
private Integer numConsecutiveHoles;
public boolean hasHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) {
if (hashRangeHole.equals(this.hashRangeHole)) {
++this.numConsecutiveHoles;
} else {
this.hashRangeHole = hashRangeHole;
this.numConsecutiveHoles = 1;
}
return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
} }
} }
/** /**
* Checks if the entire hash range is covered * Helper class to compare leases based on their hash range.
* @return true if covered, false otherwise
*/ */
public boolean hashRangeCovered() { private static class HashKeyRangeComparator implements Comparator<Lease>, Serializable {
// TODO: Implement method
return true; private static final long serialVersionUID = 1L;
@Override
public int compare(Lease lease, Lease otherLease) {
Validate.notNull(lease);
Validate.notNull(otherLease);
Validate.notNull(lease.hashKeyRangeForLease());
Validate.notNull(otherLease.hashKeyRangeForLease());
return ComparisonChain.start()
.compare(lease.hashKeyRangeForLease().startingHashKey(), otherLease.hashKeyRangeForLease().startingHashKey())
.compare(lease.hashKeyRangeForLease().endingHashKey(), otherLease.hashKeyRangeForLease().endingHashKey())
.result();
}
} }
} }

View file

@ -111,7 +111,6 @@ public class Scheduler implements Runnable {
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker"; private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count"; private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
@ -289,8 +288,8 @@ public class Scheduler implements Runnable {
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, currentStreamConfigMap, leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider); shardSyncTaskManagerProvider, isMultiStreamMode);
} }
/** /**
@ -351,11 +350,8 @@ public class Scheduler implements Runnable {
} else { } else {
log.info("LeaseCoordinator is already running. No need to start it."); log.info("LeaseCoordinator is already running. No need to start it.");
} }
log.info("Scheduling periodicShardSync)"); log.info("Scheduling periodicShardSync");
// leaderElectedPeriodicShardSyncManager.start(shardSyncTasks); leaderElectedPeriodicShardSyncManager.start();
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
// TODO: Determine if waitUntilHashRangeCovered() is needed.
//waitUntilHashRangeCovered();
streamSyncWatch.start(); streamSyncWatch.start();
isDone = true; isDone = true;
} catch (LeasingException e) { } catch (LeasingException e) {
@ -398,18 +394,6 @@ public class Scheduler implements Runnable {
return shouldInitiateLeaseSync; return shouldInitiateLeaseSync;
} }
private void waitUntilHashRangeCovered() throws InterruptedException {
// TODO: Currently this call is not in use. We may need to implement this method later. Created SIM to track the work: https://sim.amazon.com/issues/KinesisLTR-202
// TODO: For future implementation, streamToShardSyncTaskManagerMap might not contain the most up to date snapshot of active streams.
// Should use currentStreamConfigMap to determine the streams to check.
while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) {
// wait until entire hash range is covered
log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
}
}
@VisibleForTesting @VisibleForTesting
void runProcessLoop() { void runProcessLoop() {
try { try {
@ -431,7 +415,7 @@ public class Scheduler implements Runnable {
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt());
final StreamConfig streamConfig = currentStreamConfigMap final StreamConfig streamConfig = currentStreamConfigMap
.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
if (createOrGetShardSyncTaskManager(streamConfig).syncShardAndLeaseInfo()) { if (createOrGetShardSyncTaskManager(streamConfig).submitShardSyncTask()) {
log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ",
streamIdentifier.serialize(), completedShard.toString()); streamIdentifier.serialize(), completedShard.toString());
} }
@ -494,7 +478,7 @@ public class Scheduler implements Runnable {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.syncShardAndLeaseInfo(); shardSyncTaskManager.submitShardSyncTask();
currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier));
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);
} else { } else {
@ -522,7 +506,7 @@ public class Scheduler implements Runnable {
+ ". Syncing shards of that stream."); + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
currentStreamConfigMap.get(streamIdentifier)); currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.syncShardAndLeaseInfo(); shardSyncTaskManager.submitShardSyncTask();
currentSetOfStreamsIter.remove(); currentSetOfStreamsIter.remove();
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);
} }
@ -865,8 +849,7 @@ public class Scheduler implements Runnable {
if (!firstItem) { if (!firstItem) {
builder.append(", "); builder.append(", ");
} }
builder.append(shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) builder.append(ShardInfo.getLeaseKey(shardInfo));
.orElse(shardInfo.shardId()));
firstItem = false; firstItem = false;
} }
slog.info("Current stream shard assignments: " + builder.toString()); slog.info("Current stream shard assignments: " + builder.toString());
@ -962,8 +945,7 @@ public class Scheduler implements Runnable {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shard); ShardConsumer consumer = shardInfoShardConsumerMap.get(shard);
if (consumer.leaseLost()) { if (consumer.leaseLost()) {
shardInfoShardConsumerMap.remove(shard); shardInfoShardConsumerMap.remove(shard);
log.debug("Removed consumer for {} as lease has been lost", log.debug("Removed consumer for {} as lease has been lost", ShardInfo.getLeaseKey(shard));
shard.streamIdentifierSerOpt().map(s -> s + ":" + shard.shardId()).orElse(shard.shardId()));
} else { } else {
consumer.executeLifecycle(); consumer.executeLifecycle();
} }

View file

@ -45,7 +45,6 @@ import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
@ -341,7 +340,7 @@ public class HierarchicalShardSyncer {
"Stream " + streamName + " is not in ACTIVE OR UPDATING state - will retry getting the shard list."); "Stream " + streamName + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
} }
if (hashRangeOfShardsIsComplete(shards)) { if (isHashRangeOfShardsComplete(shards)) {
return shards; return shards;
} }
@ -359,7 +358,7 @@ public class HierarchicalShardSyncer {
" is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); " is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
} }
private static boolean hashRangeOfShardsIsComplete(@NonNull List<Shard> shards) { private static boolean isHashRangeOfShardsComplete(@NonNull List<Shard> shards) {
if (shards.isEmpty()) { if (shards.isEmpty()) {
throw new IllegalStateException("No shards found when attempting to validate complete hash range."); throw new IllegalStateException("No shards found when attempting to validate complete hash range.");

View file

@ -163,7 +163,6 @@ public class Lease {
pendingCheckpointState(lease.pendingCheckpointState); pendingCheckpointState(lease.pendingCheckpointState);
parentShardIds(lease.parentShardIds); parentShardIds(lease.parentShardIds);
childShardIds(lease.childShardIds); childShardIds(lease.childShardIds);
hashKeyRange(lease.hashKeyRangeForLease);
} }
/** /**
@ -308,4 +307,6 @@ public class Lease {
public Lease copy() { public Lease copy() {
return new Lease(this); return new Lease(this);
} }
} }

View file

@ -105,7 +105,7 @@ public interface LeaseRefresher {
* @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity * @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity
* @throws DependencyException if DynamoDB get fails in an unexpected way * @throws DependencyException if DynamoDB get fails in an unexpected way
* *
* @return lease for the specified shardId, or null if one doesn't exist * @return lease for the specified leaseKey, or null if one doesn't exist
*/ */
Lease getLease(String leaseKey) throws DependencyException, InvalidStateException, ProvisionedThroughputException; Lease getLease(String leaseKey) throws DependencyException, InvalidStateException, ProvisionedThroughputException;
@ -191,6 +191,21 @@ public interface LeaseRefresher {
boolean updateLease(Lease lease) boolean updateLease(Lease lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException; throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing
* library such as leaseCounter, leaseOwner, or leaseKey.
*
* @return true if update succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
default void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throw new UnsupportedOperationException("updateLeaseWithNoExpectation is not implemented");
}
/** /**
* Check (synchronously) if there are any leases in the lease table. * Check (synchronously) if there are any leases in the lease table.
* *

View file

@ -107,6 +107,15 @@ public interface LeaseSerializer {
*/ */
Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease); Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease);
/**
* @param lease
* @param updateField
* @return the attribute value map that updates application-specific data for a lease
*/
default Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease, UpdateField updateField) {
throw new UnsupportedOperationException();
}
/** /**
* @return the key schema for creating a DynamoDB table to store leases * @return the key schema for creating a DynamoDB table to store leases
*/ */
@ -116,4 +125,5 @@ public interface LeaseSerializer {
* @return attribute definitions for creating a DynamoDB table to store leases * @return attribute definitions for creating a DynamoDB table to store leases
*/ */
Collection<AttributeDefinition> getAttributeDefinitions(); Collection<AttributeDefinition> getAttributeDefinitions();
} }

View file

@ -126,7 +126,11 @@ public class ShardSyncTaskManager {
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
} }
public TaskResult executeShardSyncTask() { /**
* Call a ShardSyncTask and return the Task Result.
* @return the Task Result.
*/
public TaskResult callShardSyncTask() {
final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector,
leaseRefresher, leaseRefresher,
initialPositionInStream, initialPositionInStream,
@ -140,7 +144,11 @@ public class ShardSyncTaskManager {
return metricCollectingTask.call(); return metricCollectingTask.call();
} }
public boolean syncShardAndLeaseInfo() { /**
* Submit a ShardSyncTask and return if the submission is successful.
* @return if the casting is successful.
*/
public boolean submitShardSyncTask() {
try { try {
lock.lock(); lock.lock();
return checkAndSubmitNextTask(); return checkAndSubmitNextTask();
@ -197,7 +205,7 @@ public class ShardSyncTaskManager {
log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException());
} }
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and // Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) // submitShardSyncTask is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in // but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next // shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the

View file

@ -0,0 +1,26 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.leases;
/**
* These are the special fields that will be updated only once during the lifetime of the lease.
* Since these are meta information that will not affect lease ownership or data durability, we allow
* any elected leader or worker to set these fields directly without any conditional checks.
* Note that though HASH_KEY_RANGE will be available during lease initialization in newer versions, we keep this
* for backfilling while rolling forward to newer versions.
*/
public enum UpdateField {
CHILD_SHARDS, HASH_KEY_RANGE
}

View file

@ -35,6 +35,7 @@ import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -659,6 +660,27 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return true; return true;
} }
@Override
public void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
log.debug("Updating lease without expectation {}", lease);
final AWSExceptionManager exceptionManager = createExceptionManager();
Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease))
.attributeUpdates(updates).build();
try {
try {
FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
throw new DependencyException(e);
}
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("update", lease.leaseKey(), e);
}
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View file

@ -36,6 +36,7 @@ import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/** /**
@ -268,6 +269,28 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
return result; return result;
} }
@Override
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease,
UpdateField updateField) {
Map<String, AttributeValueUpdate> result = new HashMap<>();
switch (updateField) {
case CHILD_SHARDS:
if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
}
break;
case HASH_KEY_RANGE:
if (lease.hashKeyRangeForLease() != null) {
result.put(STARTING_HASH_KEY, putUpdate(
DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())));
result.put(ENDING_HASH_KEY, putUpdate(
DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())));
}
break;
}
return result;
}
@Override @Override
public Collection<KeySchemaElement> getKeySchema() { public Collection<KeySchemaElement> getKeySchema() {
List<KeySchemaElement> keySchema = new ArrayList<>(); List<KeySchemaElement> keySchema = new ArrayList<>();

View file

@ -25,8 +25,6 @@ import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.function.Function;
/** /**
* Task to block until processing of all data records in the parent shard(s) is completed. * Task to block until processing of all data records in the parent shard(s) is completed.
* We check if we have checkpoint(s) for the parent shard(s). * We check if we have checkpoint(s) for the parent shard(s).
@ -56,8 +54,7 @@ public class BlockOnParentShardTask implements ConsumerTask {
@Override @Override
public TaskResult call() { public TaskResult call() {
Exception exception = null; Exception exception = null;
final String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) final String shardInfoId = ShardInfo.getLeaseKey(shardInfo);
.orElse(shardInfo.shardId());
try { try {
boolean blockedOnParentShard = false; boolean blockedOnParentShard = false;
for (String shardId : shardInfo.parentShardIds()) { for (String shardId : shardInfo.parentShardIds()) {

View file

@ -76,8 +76,7 @@ public class ProcessTask implements ConsumerTask {
@NonNull AggregatorUtil aggregatorUtil, @NonNull AggregatorUtil aggregatorUtil,
@NonNull MetricsFactory metricsFactory) { @NonNull MetricsFactory metricsFactory) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) this.shardInfoId = ShardInfo.getLeaseKey(shardInfo);
.orElse(shardInfo.shardId());
this.shardRecordProcessor = shardRecordProcessor; this.shardRecordProcessor = shardRecordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;

View file

@ -24,6 +24,7 @@ import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.RetryableRetrievalException;
@ -70,8 +71,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.shardConsumer = shardConsumer; this.shardConsumer = shardConsumer;
this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning; this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning;
this.shardInfoId = shardConsumer.shardInfo().streamIdentifierSerOpt() this.shardInfoId = ShardInfo.getLeaseKey(shardConsumer.shardInfo());
.map(s -> s + ":" + shardConsumer.shardInfo().shardId()).orElse(shardConsumer.shardInfo().shardId());
} }

View file

@ -15,15 +15,10 @@
package software.amazon.kinesis.lifecycle; package software.amazon.kinesis.lifecycle;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.function.Function;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
@ -47,8 +42,10 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -89,8 +86,8 @@ public class ShutdownTask implements ConsumerTask {
private final List<ChildShard> childShards; private final List<ChildShard> childShards;
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> shardInfo private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId());
/* /*
* Invokes ShardRecordProcessor shutdown() API. * Invokes ShardRecordProcessor shutdown() API.
* (non-Javadoc) * (non-Javadoc)
@ -118,7 +115,7 @@ public class ShutdownTask implements ConsumerTask {
// This scenario could happen when customer deletes the stream while leaving the KCL application running. // This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (!CollectionUtils.isNullOrEmpty(childShards)) { if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist(); createLeasesForChildShardsIfNotExist();
updateLeasesForChildShards(); updateLeaseWithChildShards();
} else { } else {
log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo)); log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo));
} }
@ -192,13 +189,14 @@ public class ShutdownTask implements ConsumerTask {
} }
} }
private void updateLeasesForChildShards() private void updateLeaseWithChildShards()
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()); Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
final Lease updatedLease = currentLease.copy(); final Lease updatedLease = currentLease.copy();
updatedLease.childShardIds(childShardIds); updatedLease.childShardIds(childShardIds);
// TODO : Make changes to use the new leaserefresher#updateLease(Lease lease, UpdateField updateField)
final boolean updateResult = leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo)); final boolean updateResult = leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
if (!updateResult) { if (!updateResult) {
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.shardId()); throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.shardId());
@ -221,26 +219,4 @@ public class ShutdownTask implements ConsumerTask {
return reason; return reason;
} }
private boolean isShardInContextParentOfAny(List<Shard> shards) {
for(Shard shard : shards) {
if (isChildShardOfShardInContext(shard)) {
return true;
}
}
return false;
}
private boolean isChildShardOfShardInContext(Shard shard) {
return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId())
|| StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId()));
}
private void dropLease() {
Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
leaseCoordinator.dropLease(currentLease);
if(currentLease != null) {
log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
}
}
} }

View file

@ -141,6 +141,11 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
return subSequenceNumber; return subSequenceNumber;
} }
public boolean isShardEnd() {
return sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString());
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View file

@ -0,0 +1,588 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize;
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MAX_HASH_KEY;
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MIN_HASH_KEY;
@RunWith(MockitoJUnitRunner.class)
public class PeriodicShardSyncManagerTest {
private StreamIdentifier streamIdentifier;
private PeriodicShardSyncManager periodicShardSyncManager;
@Mock
private LeaderDecider leaderDecider;
@Mock
private LeaseRefresher leaseRefresher;
@Mock
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
@Mock
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
@Before
public void setup() {
streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456");
periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, true);
}
@Test
public void testForFailureWhenHashRangesAreIncomplete() {
List<Lease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("25", MAX_HASH_KEY.toString())); // Missing interval here
}}.stream().map(hashKeyRangeForLease -> {
Lease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertTrue(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent());
}
@Test
public void testForSuccessWhenHashRangesAreComplete() {
List<Lease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
Lease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent());
}
@Test
public void testForSuccessWhenUnSortedHashRangesAreComplete() {
List<Lease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("4", "23"));
add(deserialize("2", "3"));
add(deserialize("0", "1"));
add(deserialize("24", MAX_HASH_KEY.toString()));
add(deserialize("6", "23"));
}}.stream().map(hashKeyRangeForLease -> {
Lease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent());
}
@Test
public void testForSuccessWhenHashRangesAreCompleteForOverlappingLeasesAtEnd() {
List<Lease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
add(deserialize("24", "45"));
}}.stream().map(hashKeyRangeForLease -> {
Lease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent());
}
@Test
public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, null).shouldDoShardSync());
}
@Test
public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, new ArrayList<>()).shouldDoShardSync());
}
@Test
public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsNotReached() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
}
@Test
public void testIfShardSyncIsInitiatedWhenConfidenceFactorIsReached() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
}
@Test
public void testIfShardSyncIsInitiatedWhenHoleIsDueToShardEnd() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23")); // introducing hole here through SHARD_END checkpoint
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
if(lease.hashKeyRangeForLease().startingHashKey().toString().equals("4")) {
lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
} else {
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
}
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
}
@Test
public void testIfShardSyncIsInitiatedWhenNoLeasesAreUsedDueToShardEnd() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
}
@Test
public void testIfShardSyncIsNotInitiatedWhenHoleShifts() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3")); // Hole between 3 and 5
add(deserialize("5", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync());
}
@Test
public void testIfShardSyncIsNotInitiatedWhenHoleShiftsMoreThanOnce() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3")); // Hole between 3 and 5
add(deserialize("5", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()));
// Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
}
@Test
public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSync() {
ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class);
ShardDetector shardDetector = mock(ShardDetector.class);
when(shardSyncTaskManagerProvider.apply(any())).thenReturn(shardSyncTaskManager);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
final int[] shardCounter = { 0 };
List<HashKeyRangeForLease> hashKeyRangeForLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "20"));
add(deserialize("21", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}};
List<Shard> kinesisShards = hashKeyRangeForLeases.stream()
.map(hashKeyRangeForLease -> Shard.builder().shardId("shard-" + (++shardCounter[0])).hashKeyRange(
HashKeyRange.builder().startingHashKey(hashKeyRangeForLease.serializedStartingHashKey())
.endingHashKey(hashKeyRangeForLease.serializedEndingHashKey()).build()).build())
.collect(Collectors.toList());
when(shardDetector.listShards()).thenReturn(kinesisShards);
final int[] leaseCounter = { 0 };
List<Lease> multiStreamLeases = hashKeyRangeForLeases.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0])));
lease.shardId("shard-"+(leaseCounter[0]));
// Setting the hashrange only for last two leases
if(leaseCounter[0] >= 3) {
lease.hashKeyRange(hashKeyRangeForLease);
}
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Assert that shard sync should never trigger
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
// Assert that all the leases now has hashRanges set.
for(Lease lease : multiStreamLeases) {
Assert.assertNotNull(lease.hashKeyRangeForLease());
}
}
@Test
public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSyncInHoleScenario() {
ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class);
ShardDetector shardDetector = mock(ShardDetector.class);
when(shardSyncTaskManagerProvider.apply(any())).thenReturn(shardSyncTaskManager);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
final int[] shardCounter = { 0 };
List<HashKeyRangeForLease> hashKeyRangeForLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("5", "20")); // Hole between 3 and 5
add(deserialize("21", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}};
List<Shard> kinesisShards = hashKeyRangeForLeases.stream()
.map(hashKeyRangeForLease -> Shard.builder().shardId("shard-" + (++shardCounter[0])).hashKeyRange(
HashKeyRange.builder().startingHashKey(hashKeyRangeForLease.serializedStartingHashKey())
.endingHashKey(hashKeyRangeForLease.serializedEndingHashKey()).build()).build())
.collect(Collectors.toList());
when(shardDetector.listShards()).thenReturn(kinesisShards);
final int[] leaseCounter = { 0 };
List<Lease> multiStreamLeases = hashKeyRangeForLeases.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0])));
lease.shardId("shard-"+(leaseCounter[0]));
// Setting the hashrange only for last two leases
if(leaseCounter[0] >= 3) {
lease.hashKeyRange(hashKeyRangeForLease);
}
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Assert that shard sync should never trigger
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
// Assert that all the leases now has hashRanges set.
for(Lease lease : multiStreamLeases) {
Assert.assertNotNull(lease.hashKeyRangeForLease());
}
}
@Test
public void testFor1000DifferentValidSplitHierarchyTreeTheHashRangesAreAlwaysComplete() {
for(int i=0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.SPLIT, maxInitialLeaseCount, false);
Collections.shuffle(leases);
// System.out.println(
// leases.stream().map(l -> l.checkpoint().sequenceNumber() + ":" + l.hashKeyRangeForLease()).collect(Collectors.toList()));
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidMergeHierarchyTreeTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, false);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidReshardHierarchyTreeTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, false);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidMergeHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, true);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidReshardHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, true);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
private List<Lease> generateInitialLeases(int initialShardCount) {
long hashRangeInternalMax = 10000000;
List<Lease> initialLeases = new ArrayList<>();
long leaseStartKey = 0;
for (int i = 1; i <= initialShardCount; i++) {
final Lease lease = new Lease();
long leaseEndKey;
if (i != initialShardCount) {
leaseEndKey = (hashRangeInternalMax / initialShardCount) * i;
lease.hashKeyRange(HashKeyRangeForLease.deserialize(leaseStartKey + "", leaseEndKey + ""));
} else {
leaseEndKey = 0;
lease.hashKeyRange(HashKeyRangeForLease.deserialize(leaseStartKey + "", MAX_HASH_KEY.toString()));
}
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
lease.leaseKey("shard-" + i);
initialLeases.add(lease);
leaseStartKey = leaseEndKey + 1;
}
return initialLeases;
}
private void reshard(List<Lease> initialLeases, int depth, ReshardType reshardType, int leaseCounter,
boolean shouldKeepSomeParentsInProgress) {
for (int i = 0; i < depth; i++) {
if (reshardType == ReshardType.SPLIT) {
leaseCounter = split(initialLeases, leaseCounter);
} else if (reshardType == ReshardType.MERGE) {
leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress);
} else {
if (isHeads()) {
leaseCounter = split(initialLeases, leaseCounter);
} else {
leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress);
}
}
}
}
private int merge(List<Lease> initialLeases, int leaseCounter, boolean shouldKeepSomeParentsInProgress) {
List<Lease> leasesEligibleForMerge = initialLeases.stream().filter(l -> CollectionUtils.isNullOrEmpty(l.childShardIds()))
.collect(Collectors.toList());
// System.out.println("Leases to merge : " + leasesEligibleForMerge);
int leasesToMerge = (int) ((leasesEligibleForMerge.size() - 1) / 2.0 * Math.random());
for (int i = 0; i < leasesToMerge; i += 2) {
Lease parent1 = leasesEligibleForMerge.get(i);
Lease parent2 = leasesEligibleForMerge.get(i + 1);
if(parent2.hashKeyRangeForLease().startingHashKey().subtract(parent1.hashKeyRangeForLease().endingHashKey()).equals(BigInteger.ONE))
{
parent1.checkpoint(ExtendedSequenceNumber.SHARD_END);
if (!shouldKeepSomeParentsInProgress || (shouldKeepSomeParentsInProgress && isOneFromDiceRoll())) {
// System.out.println("Deciding to keep parent in progress : " + parent2);
parent2.checkpoint(ExtendedSequenceNumber.SHARD_END);
}
Lease child = new Lease();
child.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
child.leaseKey("shard-" + (++leaseCounter));
// System.out.println("Parent " + parent1 + " and " + parent2 + " merges into " + child);
child.hashKeyRange(new HashKeyRangeForLease(parent1.hashKeyRangeForLease().startingHashKey(),
parent2.hashKeyRangeForLease().endingHashKey()));
parent1.childShardIds(Collections.singletonList(child.leaseKey()));
parent2.childShardIds(Collections.singletonList(child.leaseKey()));
child.parentShardIds(Sets.newHashSet(parent1.leaseKey(), parent2.leaseKey()));
initialLeases.add(child);
}
}
return leaseCounter;
}
private int split(List<Lease> initialLeases, int leaseCounter) {
List<Lease> leasesEligibleForSplit = initialLeases.stream().filter(l -> CollectionUtils.isNullOrEmpty(l.childShardIds()))
.collect(Collectors.toList());
// System.out.println("Leases to split : " + leasesEligibleForSplit);
int leasesToSplit = (int) (leasesEligibleForSplit.size() * Math.random());
for (int i = 0; i < leasesToSplit; i++) {
Lease parent = leasesEligibleForSplit.get(i);
parent.checkpoint(ExtendedSequenceNumber.SHARD_END);
Lease child1 = new Lease();
child1.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
child1.hashKeyRange(new HashKeyRangeForLease(parent.hashKeyRangeForLease().startingHashKey(),
parent.hashKeyRangeForLease().startingHashKey().add(parent.hashKeyRangeForLease().endingHashKey())
.divide(new BigInteger("2"))));
child1.leaseKey("shard-" + (++leaseCounter));
Lease child2 = new Lease();
child2.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
child2.hashKeyRange(new HashKeyRangeForLease(
parent.hashKeyRangeForLease().startingHashKey().add(parent.hashKeyRangeForLease().endingHashKey())
.divide(new BigInteger("2")).add(new BigInteger("1")),
parent.hashKeyRangeForLease().endingHashKey()));
child2.leaseKey("shard-" + (++leaseCounter));
child1.parentShardIds(Sets.newHashSet(parent.leaseKey()));
child2.parentShardIds(Sets.newHashSet(parent.leaseKey()));
parent.childShardIds(Lists.newArrayList(child1.leaseKey(), child2.leaseKey()));
// System.out.println("Parent " + parent + " splits into " + child1 + " and " + child2);
initialLeases.add(child1);
initialLeases.add(child2);
}
return leaseCounter;
}
private boolean isHeads() {
return Math.random() <= 0.5;
}
private boolean isOneFromDiceRoll() {
return Math.random() <= 0.16;
}
private enum ReshardType {
SPLIT,
MERGE,
ANY
}
}

View file

@ -190,7 +190,7 @@ public class SchedulerTest {
}); });
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class));
@ -1036,7 +1036,7 @@ public class SchedulerTest {
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier());
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
if(shardSyncFirstAttemptFailure) { if(shardSyncFirstAttemptFailure) {
when(shardDetector.listShards()) when(shardDetector.listShards())
.thenThrow(new RuntimeException("Service Exception")) .thenThrow(new RuntimeException("Service Exception"))

View file

@ -22,6 +22,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -40,6 +41,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public class BlockOnParentShardTaskTest { public class BlockOnParentShardTaskTest {
private final long backoffTimeInMillis = 50L; private final long backoffTimeInMillis = 50L;
private final String shardId = "shardId-97"; private final String shardId = "shardId-97";
private final String streamId = "123:stream:146";
private final String concurrencyToken = "testToken"; private final String concurrencyToken = "testToken";
private final List<String> emptyParentShardIds = new ArrayList<String>(); private final List<String> emptyParentShardIds = new ArrayList<String>();
private ShardInfo shardInfo; private ShardInfo shardInfo;
@ -73,7 +75,7 @@ public class BlockOnParentShardTaskTest {
* @throws DependencyException * @throws DependencyException
*/ */
@Test @Test
public final void testCallWhenParentsHaveFinished() public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinished()
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ShardInfo shardInfo = null; ShardInfo shardInfo = null;
@ -107,6 +109,50 @@ public class BlockOnParentShardTaskTest {
assertNull(result.getException()); assertNull(result.getException());
} }
/**
* Test call() when there are 1-2 parent shards that have been fully processed.
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
@Test
public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMultiStream()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1LeaseKey = streamId + ":" + "shardId-1";
String parent2LeaseKey = streamId + ":" + "shardId-2";
String parent1ShardId = "shardId-1";
String parent2ShardId = "shardId-2";
List<String> parentShardIds = new ArrayList<>();
TaskResult result = null;
Lease parent1Lease = new Lease();
parent1Lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
Lease parent2Lease = new Lease();
parent2Lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
LeaseRefresher leaseRefresher = mock(LeaseRefresher.class);
when(leaseRefresher.getLease(parent1LeaseKey)).thenReturn(parent1Lease);
when(leaseRefresher.getLease(parent2LeaseKey)).thenReturn(parent2Lease);
// test single parent
parentShardIds.add(parent1ShardId);
shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON,
streamId);
task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
result = task.call();
assertNull(result.getException());
// test two parents
parentShardIds.add(parent2ShardId);
shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId);
task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
result = task.call();
assertNull(result.getException());
}
/** /**
* Test call() when there are 1-2 parent shards that have NOT been fully processed. * Test call() when there are 1-2 parent shards that have NOT been fully processed.
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException
@ -115,7 +161,7 @@ public class BlockOnParentShardTaskTest {
*/ */
@Test @Test
public final void testCallWhenParentsHaveNotFinished() public final void testCallWhenParentsHaveNotFinished()
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ShardInfo shardInfo = null; ShardInfo shardInfo = null;
BlockOnParentShardTask task = null; BlockOnParentShardTask task = null;
@ -149,6 +195,50 @@ public class BlockOnParentShardTaskTest {
assertNotNull(result.getException()); assertNotNull(result.getException());
} }
/**
* Test call() when there are 1-2 parent shards that have NOT been fully processed.
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
@Test
public final void testCallWhenParentsHaveNotFinishedMultiStream()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1LeaseKey = streamId + ":" + "shardId-1";
String parent2LeaseKey = streamId + ":" + "shardId-2";
String parent1ShardId = "shardId-1";
String parent2ShardId = "shardId-2";
List<String> parentShardIds = new ArrayList<>();
TaskResult result = null;
Lease parent1Lease = new Lease();
parent1Lease.checkpoint(ExtendedSequenceNumber.LATEST);
Lease parent2Lease = new Lease();
// mock a sequence number checkpoint
parent2Lease.checkpoint(new ExtendedSequenceNumber("98182584034"));
LeaseRefresher leaseRefresher = mock(LeaseRefresher.class);
when(leaseRefresher.getLease(parent1LeaseKey)).thenReturn(parent1Lease);
when(leaseRefresher.getLease(parent2LeaseKey)).thenReturn(parent2Lease);
// test single parent
parentShardIds.add(parent1ShardId);
shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId);
task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
result = task.call();
assertNotNull(result.getException());
// test two parents
parentShardIds.add(parent2ShardId);
shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId);
task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
result = task.call();
assertNotNull(result.getException());
}
/** /**
* Test call() with 1 parent shard before and after it is completely processed. * Test call() with 1 parent shard before and after it is completely processed.
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException