Add periodic hash range auditor, hole detection, and recovery (#43)

* Add periodic hash range auditor, hole detection, and recovery

* Add unit tests for hash range hole recovery

* Fix max hash range bug

* Address PR feedback

* Fix DDB hash key persistence bug
This commit is contained in:
Micah Jaffe 2020-06-22 08:45:46 -07:00 committed by GitHub
parent 6b474b7390
commit 3a88a60a4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 1191 additions and 32 deletions

View file

@ -14,39 +14,98 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.util.CollectionUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange;
/**
* The top level orchestrator for coordinating the periodic shard sync related
* activities.
* The top level orchestrator for coordinating the periodic shard sync related activities. If the configured
* {@link ShardSyncStrategyType} is PERIODIC, this class will be the main shard sync orchestrator. For non-PERIODIC
* strategies, this class will serve as an internal auditor that periodically checks if the full hash range is covered
* by currently held leases, and initiates a recovery shard sync if not.
*/
@Getter
@EqualsAndHashCode
class PeriodicShardSyncManager {
private static final Log LOG = LogFactory.getLog(PeriodicShardSyncManager.class);
private static final long INITIAL_DELAY = 0;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000;
/** DEFAULT interval is used for PERIODIC {@link ShardSyncStrategyType}. */
private static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000L;
/** AUDITOR interval is used for non-PERIODIC {@link ShardSyncStrategyType} auditor processes. */
private static final long AUDITOR_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
/** Parameters for validating hash range completeness when running in auditor mode. */
@VisibleForTesting
static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
@VisibleForTesting
static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
@VisibleForTesting
static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3;
private final HashRangeHoleTracker hashRangeHoleTracker = new HashRangeHoleTracker();
private final String workerId;
private final LeaderDecider leaderDecider;
private final ITask metricsEmittingShardSyncTask;
private final ScheduledExecutorService shardSyncThreadPool;
private final ILeaseManager<KinesisClientLease> leaseManager;
private final IKinesisProxy kinesisProxy;
private final boolean isAuditorMode;
private final long periodicShardSyncIntervalMillis;
private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, IMetricsFactory metricsFactory) {
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory);
PeriodicShardSyncManager(String workerId,
LeaderDecider leaderDecider,
ShardSyncTask shardSyncTask,
IMetricsFactory metricsFactory,
ILeaseManager<KinesisClientLease> leaseManager,
IKinesisProxy kinesisProxy,
boolean isAuditorMode) {
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory,
leaseManager, kinesisProxy, isAuditorMode);
}
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, IMetricsFactory metricsFactory) {
PeriodicShardSyncManager(String workerId,
LeaderDecider leaderDecider,
ShardSyncTask shardSyncTask,
ScheduledExecutorService shardSyncThreadPool,
IMetricsFactory metricsFactory,
ILeaseManager<KinesisClientLease> leaseManager,
IKinesisProxy kinesisProxy,
boolean isAuditorMode) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager.");
@ -54,6 +113,16 @@ class PeriodicShardSyncManager {
this.leaderDecider = leaderDecider;
this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory);
this.shardSyncThreadPool = shardSyncThreadPool;
this.leaseManager = leaseManager;
this.kinesisProxy = kinesisProxy;
this.isAuditorMode = isAuditorMode;
if (isAuditorMode) {
Validate.notNull(this.leaseManager, "LeaseManager is required for non-PERIODIC shard sync strategies.");
Validate.notNull(this.kinesisProxy, "KinesisProxy is required for non-PERIODIC shard sync strategies.");
this.periodicShardSyncIntervalMillis = AUDITOR_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS;
} else {
this.periodicShardSyncIntervalMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS;
}
}
public synchronized TaskResult start() {
@ -67,7 +136,7 @@ class PeriodicShardSyncManager {
};
shardSyncThreadPool
.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, periodicShardSyncIntervalMillis,
TimeUnit.MILLISECONDS);
isRunning = true;
}
@ -95,11 +164,227 @@ class PeriodicShardSyncManager {
private void runShardSync() {
if (leaderDecider.isLeader(workerId)) {
LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId));
metricsEmittingShardSyncTask.call();
LOG.debug("WorkerId " + workerId + " is a leader, running the shard sync task");
try {
final ShardSyncResponse shardSyncResponse = checkForShardSync();
if (shardSyncResponse.shouldDoShardSync()) {
LOG.info("Periodic shard syncer initiating shard sync due to the reason - " +
shardSyncResponse.reasonForDecision());
metricsEmittingShardSyncTask.call();
} else {
LOG.info("Skipping shard sync due to the reason - " + shardSyncResponse.reasonForDecision());
}
} catch (Exception e) {
LOG.error("Caught exception while running periodic shard syncer.", e);
}
} else {
LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
LOG.debug("WorkerId " + workerId + " is not a leader, not running the shard sync task");
}
}
@VisibleForTesting
ShardSyncResponse checkForShardSync() throws DependencyException, InvalidStateException,
ProvisionedThroughputException {
if (!isAuditorMode) {
// If we are running with PERIODIC shard sync strategy, we should sync every time.
return new ShardSyncResponse(true, "Syncing every time with PERIODIC shard sync strategy.");
}
// Get current leases from DynamoDB.
final List<KinesisClientLease> currentLeases = leaseManager.listLeases();
if (CollectionUtils.isNullOrEmpty(currentLeases)) {
// If the current leases are null or empty, then we need to initiate a shard sync.
LOG.info("No leases found. Will trigger a shard sync.");
return new ShardSyncResponse(true, "No leases found.");
}
// Check if there are any holes in the hash range covered by current leases. Return the first hole if present.
Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(currentLeases);
if (hashRangeHoleOpt.isPresent()) {
// If hole is present, check if the hole is detected consecutively in previous occurrences. If hole is
// determined with high confidence, return true; return false otherwise. We use the high confidence factor
// to avoid shard sync on any holes during resharding and lease cleanups, or other intermittent issues.
final boolean hasHoleWithHighConfidence =
hashRangeHoleTracker.hashHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
return new ShardSyncResponse(hasHoleWithHighConfidence,
"Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " +
"Will initiate shard sync after reaching threshold: " + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY);
} else {
// If hole is not present, clear any previous hole tracking and return false.
hashRangeHoleTracker.reset();
return new ShardSyncResponse(false, "Hash range is complete.");
}
}
@VisibleForTesting
Optional<HashRangeHole> hasHoleInLeases(List<KinesisClientLease> leases) {
// Filter out any leases with checkpoints other than SHARD_END
final List<KinesisClientLease> activeLeases = leases.stream()
.filter(lease -> lease.getCheckpoint() != null && !lease.getCheckpoint().isShardEnd())
.collect(Collectors.toList());
final List<KinesisClientLease> activeLeasesWithHashRanges = fillWithHashRangesIfRequired(activeLeases);
return checkForHoleInHashKeyRanges(activeLeasesWithHashRanges);
}
private List<KinesisClientLease> fillWithHashRangesIfRequired(List<KinesisClientLease> activeLeases) {
final List<KinesisClientLease> activeLeasesWithNoHashRanges = activeLeases.stream()
.filter(lease -> lease.getHashKeyRange() == null).collect(Collectors.toList());
if (activeLeasesWithNoHashRanges.isEmpty()) {
return activeLeases;
}
// Fetch shards from Kinesis to fill in the in-memory hash ranges
final Map<String, Shard> kinesisShards = kinesisProxy.getShardList().stream()
.collect(Collectors.toMap(Shard::getShardId, shard -> shard));
return activeLeases.stream().map(lease -> {
if (lease.getHashKeyRange() == null) {
final String shardId = lease.getLeaseKey();
final Shard shard = kinesisShards.get(shardId);
if (shard == null) {
return lease;
}
lease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange()));
try {
leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
} catch (Exception e) {
LOG.warn("Unable to update hash range information for lease " + lease.getLeaseKey() +
". This may result in explicit lease sync.");
}
}
return lease;
}).filter(lease -> lease.getHashKeyRange() != null).collect(Collectors.toList());
}
@VisibleForTesting
static Optional<HashRangeHole> checkForHoleInHashKeyRanges(List<KinesisClientLease> leasesWithHashKeyRanges) {
// Sort the hash ranges by starting hash key
final List<KinesisClientLease> sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges);
if (sortedLeasesWithHashKeyRanges.isEmpty()) {
LOG.error("No leases with valid hash ranges found.");
return Optional.of(new HashRangeHole());
}
// Validate the hash range bounds
final KinesisClientLease minHashKeyLease = sortedLeasesWithHashKeyRanges.get(0);
final KinesisClientLease maxHashKeyLease =
sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1);
if (!minHashKeyLease.getHashKeyRange().startingHashKey().equals(MIN_HASH_KEY) ||
!maxHashKeyLease.getHashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) {
LOG.error("Incomplete hash range found between " + minHashKeyLease + " and " + maxHashKeyLease);
return Optional.of(new HashRangeHole(minHashKeyLease.getHashKeyRange(), maxHashKeyLease.getHashKeyRange()));
}
// Check for any holes in the sorted hash range intervals
if (sortedLeasesWithHashKeyRanges.size() > 1) {
KinesisClientLease leftmostLeaseToReportInCaseOfHole = minHashKeyLease;
HashKeyRangeForLease leftLeaseHashRange = leftmostLeaseToReportInCaseOfHole.getHashKeyRange();
for (int i = 1; i < sortedLeasesWithHashKeyRanges.size(); i++) {
final KinesisClientLease rightLease = sortedLeasesWithHashKeyRanges.get(i);
final HashKeyRangeForLease rightLeaseHashRange = rightLease.getHashKeyRange();
final BigInteger rangeDiff =
rightLeaseHashRange.startingHashKey().subtract(leftLeaseHashRange.endingHashKey());
// We have overlapping leases when rangeDiff is 0 or negative.
// signum() will be -1 for negative and 0 if value is 0.
// Merge the ranges for further tracking.
if (rangeDiff.signum() <= 0) {
leftLeaseHashRange = new HashKeyRangeForLease(leftLeaseHashRange.startingHashKey(),
leftLeaseHashRange.endingHashKey().max(rightLeaseHashRange.endingHashKey()));
} else {
// We have non-overlapping leases when rangeDiff is positive. signum() will be 1 in this case.
// If rangeDiff is 1, then it is a continuous hash range. If not, there is a hole.
if (!rangeDiff.equals(BigInteger.ONE)) {
LOG.error("Incomplete hash range found between " + leftmostLeaseToReportInCaseOfHole +
" and " + rightLease);
return Optional.of(new HashRangeHole(leftmostLeaseToReportInCaseOfHole.getHashKeyRange(),
rightLease.getHashKeyRange()));
}
leftmostLeaseToReportInCaseOfHole = rightLease;
leftLeaseHashRange = rightLeaseHashRange;
}
}
}
return Optional.empty();
}
@VisibleForTesting
static List<KinesisClientLease> sortLeasesByHashRange(List<KinesisClientLease> leasesWithHashKeyRanges) {
if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) {
return leasesWithHashKeyRanges;
}
Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator());
return leasesWithHashKeyRanges;
}
@Value
@Accessors(fluent = true)
@VisibleForTesting
static class ShardSyncResponse {
private final boolean shouldDoShardSync;
private final String reasonForDecision;
}
@Value
private static class HashRangeHole {
private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole;
private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole;
HashRangeHole() {
hashRangeAtStartOfPossibleHole = hashRangeAtEndOfPossibleHole = null;
}
HashRangeHole(HashKeyRangeForLease hashRangeAtStartOfPossibleHole,
HashKeyRangeForLease hashRangeAtEndOfPossibleHole) {
this.hashRangeAtStartOfPossibleHole = hashRangeAtStartOfPossibleHole;
this.hashRangeAtEndOfPossibleHole = hashRangeAtEndOfPossibleHole;
}
}
private static class HashRangeHoleTracker {
private HashRangeHole hashRangeHole;
@Getter
private Integer numConsecutiveHoles;
public boolean hashHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) {
if (hashRangeHole.equals(this.hashRangeHole)) {
++this.numConsecutiveHoles;
} else {
this.hashRangeHole = hashRangeHole;
this.numConsecutiveHoles = 1;
}
return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
}
public void reset() {
this.hashRangeHole = null;
this.numConsecutiveHoles = 0;
}
}
private static class HashKeyRangeComparator implements Comparator<KinesisClientLease>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(KinesisClientLease lease, KinesisClientLease otherLease) {
Validate.notNull(lease);
Validate.notNull(otherLease);
Validate.notNull(lease.getHashKeyRange());
Validate.notNull(otherLease.getHashKeyRange());
return ComparisonChain.start()
.compare(lease.getHashKeyRange().startingHashKey(), otherLease.getHashKeyRange().startingHashKey())
.compare(lease.getHashKeyRange().endingHashKey(), otherLease.getHashKeyRange().endingHashKey())
.result();
}
}
}

View file

@ -577,8 +577,9 @@ public class Worker implements Runnable {
/**
* Create shard sync strategy and corresponding {@link LeaderDecider} based on provided configs. PERIODIC
* {@link ShardSyncStrategyType} honors custom leaderDeciders for leader election strategy. All other
* {@link ShardSyncStrategyType}s permit only a default single-leader strategy.
* {@link ShardSyncStrategyType} honors custom leaderDeciders for leader election strategy, and does not permit
* skipping shard syncs if the hash range is complete. All other {@link ShardSyncStrategyType}s permit only a
* default single-leader strategy, and will skip shard syncs unless a hole in the hash range is detected.
*/
private void createShardSyncStrategy(ShardSyncStrategyType strategyType,
LeaderDecider leaderDecider,
@ -587,7 +588,7 @@ public class Worker implements Runnable {
case PERIODIC:
this.leaderDecider = getOrCreateLeaderDecider(leaderDecider);
this.leaderElectedPeriodicShardSyncManager =
getOrCreatePeriodicShardSyncManager(periodicShardSyncManager);
getOrCreatePeriodicShardSyncManager(periodicShardSyncManager, false);
this.shardSyncStrategy = createPeriodicShardSyncStrategy();
break;
case SHARD_END:
@ -598,7 +599,7 @@ public class Worker implements Runnable {
}
this.leaderDecider = getOrCreateLeaderDecider(null);
this.leaderElectedPeriodicShardSyncManager =
getOrCreatePeriodicShardSyncManager(periodicShardSyncManager);
getOrCreatePeriodicShardSyncManager(periodicShardSyncManager, true);
this.shardSyncStrategy = createShardEndShardSyncStrategy();
}
@ -1255,9 +1256,10 @@ public class Worker implements Runnable {
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
}
private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager) {
// TODO: Configure periodicShardSyncManager with either mandatory shard sync (PERIODIC) or hash range
// validation based shard sync (SHARD_END) based on configured shard sync strategy
/** A non-null PeriodicShardSyncManager can only provided from unit tests. Any application code will create the
* PeriodicShardSyncManager for the first time here. */
private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager,
boolean isAuditorMode) {
if (periodicShardSyncManager != null) {
return periodicShardSyncManager;
}
@ -1272,7 +1274,10 @@ public class Worker implements Runnable {
SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
shardSyncer,
null),
metricsFactory);
metricsFactory,
leaseCoordinator.getLeaseManager(),
streamConfig.getStreamProxy(),
isAuditorMode);
}
/**

View file

@ -141,6 +141,10 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
return subSequenceNumber;
}
public boolean isShardEnd() {
return sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString());
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();

View file

@ -0,0 +1,79 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.impl;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import lombok.NonNull;
import lombok.Value;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import java.math.BigInteger;
@Value
@Accessors(fluent = true)
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
/**
* Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards.
*/
public class HashKeyRangeForLease {
private final BigInteger startingHashKey;
private final BigInteger endingHashKey;
/**
* Serialize the startingHashKey for persisting in external storage
*
* @return Serialized startingHashKey
*/
public String serializedStartingHashKey() {
return startingHashKey.toString();
}
/**
* Serialize the endingHashKey for persisting in external storage
*
* @return Serialized endingHashKey
*/
public String serializedEndingHashKey() {
return endingHashKey.toString();
}
/**
* Deserialize from serialized hashKeyRange string from external storage.
*
* @param startingHashKeyStr
* @param endingHashKeyStr
* @return HashKeyRangeForLease
*/
public static HashKeyRangeForLease deserialize(@NonNull String startingHashKeyStr, @NonNull String endingHashKeyStr) {
final BigInteger startingHashKey = new BigInteger(startingHashKeyStr);
final BigInteger endingHashKey = new BigInteger(endingHashKeyStr);
Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0,
"StartingHashKey %s must be less than EndingHashKey %s ", startingHashKeyStr, endingHashKeyStr);
return new HashKeyRangeForLease(startingHashKey, endingHashKey);
}
/**
* Construct HashKeyRangeForLease from Kinesis HashKeyRange
*
* @param hashKeyRange
* @return HashKeyRangeForLease
*/
public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) {
return deserialize(hashKeyRange.getStartingHashKey(), hashKeyRange.getEndingHashKey());
}
}

View file

@ -30,6 +30,8 @@ public class KinesisClientLease extends Lease {
private ExtendedSequenceNumber pendingCheckpoint;
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<String>();
private Set<String> childShardIds = new HashSet<String>();
private HashKeyRangeForLease hashKeyRangeForLease;
public KinesisClientLease() {
@ -41,17 +43,22 @@ public class KinesisClientLease extends Lease {
this.pendingCheckpoint = other.getPendingCheckpoint();
this.ownerSwitchesSinceCheckpoint = other.getOwnerSwitchesSinceCheckpoint();
this.parentShardIds.addAll(other.getParentShardIds());
this.childShardIds = other.getChildShardIds();
this.hashKeyRangeForLease = other.getHashKeyRange();
}
KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken,
Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, ExtendedSequenceNumber pendingCheckpoint,
Long ownerSwitchesSinceCheckpoint, Set<String> parentShardIds) {
Long ownerSwitchesSinceCheckpoint, Set<String> parentShardIds, Set<String> childShardIds,
HashKeyRangeForLease hashKeyRangeForLease) {
super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos);
this.checkpoint = checkpoint;
this.pendingCheckpoint = pendingCheckpoint;
this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint;
this.parentShardIds.addAll(parentShardIds);
this.childShardIds.addAll(childShardIds);
this.hashKeyRangeForLease = hashKeyRangeForLease;
}
/**
@ -100,6 +107,20 @@ public class KinesisClientLease extends Lease {
return new HashSet<String>(parentShardIds);
}
/**
* @return shardIds that are the children of this lease. Used for resharding.
*/
public Set<String> getChildShardIds() {
return new HashSet<String>(childShardIds);
}
/**
* @return hash key range that this lease covers.
*/
public HashKeyRangeForLease getHashKeyRange() {
return hashKeyRangeForLease;
}
/**
* Sets checkpoint.
*
@ -142,6 +163,29 @@ public class KinesisClientLease extends Lease {
this.parentShardIds.clear();
this.parentShardIds.addAll(parentShardIds);
}
/**
* Sets childShardIds.
*
* @param childShardIds may not be null
*/
public void setChildShardIds(Collection<String> childShardIds) {
verifyNotNull(childShardIds, "childShardIds should not be null");
this.childShardIds.clear();
this.childShardIds.addAll(childShardIds);
}
/**
* Sets hashKeyRangeForLease.
*
* @param hashKeyRangeForLease may not be null
*/
public void setHashKeyRange(HashKeyRangeForLease hashKeyRangeForLease) {
verifyNotNull(hashKeyRangeForLease, "hashKeyRangeForLease should not be null");
this.hashKeyRangeForLease = hashKeyRangeForLease;
}
private void verifyNotNull(Object object, String message) {
if (object == null) {

View file

@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.leases.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
@ -26,8 +27,11 @@ import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseSerializer;
import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
import com.amazonaws.util.CollectionUtils;
import com.google.common.base.Strings;
import static com.amazonaws.services.kinesis.leases.impl.UpdateField.HASH_KEY_RANGE;
/**
* An implementation of ILeaseSerializer for KinesisClientLease objects.
*/
@ -39,6 +43,9 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint";
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
public final String PARENT_SHARD_ID_KEY = "parentShardId";
private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
private static final String STARTING_HASH_KEY = "startingHashKey";
private static final String ENDING_HASH_KEY = "endingHashKey";
private final LeaseSerializer baseSerializer = new LeaseSerializer(KinesisClientLease.class);
@ -155,6 +162,28 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
return result;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(KinesisClientLease lease,
UpdateField updateField) {
Map<String, AttributeValueUpdate> result = new HashMap<>();
switch (updateField) {
case CHILD_SHARDS:
// TODO: Implement update fields for child shards
break;
case HASH_KEY_RANGE:
if (lease.getHashKeyRange() != null) {
result.put(STARTING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(
lease.getHashKeyRange().serializedStartingHashKey()), AttributeAction.PUT));
result.put(ENDING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(
lease.getHashKeyRange().serializedEndingHashKey()), AttributeAction.PUT));
}
break;
}
return result;
}
@Override
public Collection<KeySchemaElement> getKeySchema() {
return baseSerializer.getKeySchema();

View file

@ -590,6 +590,35 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
return true;
}
/**
* {@inheritDoc}
*/
@Override
public void updateLeaseWithMetaInfo(T lease, UpdateField updateField)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
verifyNotNull(updateField, "updateField cannot be null");
if (LOG.isDebugEnabled()) {
LOG.debug("Updating lease " + lease + " for field " + updateField);
}
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));
request.setAttributeUpdates(updates);
try {
dynamoDBClient.updateItem(request);
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e);
}
}
/*
* This method contains boilerplate exception handling - it throws or returns something to be thrown. The
* inconsistency there exists to satisfy the compiler when this method is used at the end of non-void methods.

View file

@ -177,6 +177,12 @@ public class LeaseSerializer implements ILeaseSerializer<Lease> {
return new HashMap<String, AttributeValueUpdate>();
}
@Override
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease, UpdateField updateField) {
// There is no application-specific data in Lease - just return a map that increments the counter.
return new HashMap<String, AttributeValueUpdate>();
}
@Override
public Collection<KeySchemaElement> getKeySchema() {
List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();

View file

@ -0,0 +1,26 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.impl;
/**
* These are the special fields that will be updated only once during the lifetime of the lease.
* Since these are meta information that will not affect lease ownership or data durability, we allow
* any elected leader or worker to set these fields directly without any conditional checks.
* Note that though HASH_KEY_RANGE will be available during lease initialization in newer versions, we keep this
* for backfilling while rolling forward to newer versions.
*/
public enum UpdateField {
CHILD_SHARDS, HASH_KEY_RANGE
}

View file

@ -20,6 +20,7 @@ import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
/**
* Supports basic CRUD operations for Leases.
@ -180,6 +181,19 @@ public interface ILeaseManager<T extends Lease> {
public boolean updateLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing
* library such as leaseCounter, leaseOwner, or leaseKey.
**
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
default void updateLeaseWithMetaInfo(T lease, UpdateField updateField)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throw new UnsupportedOperationException("updateLeaseWithMetaInfo is not implemented.");
}
/**
* Check (synchronously) if there are any leases in the lease table.
*

View file

@ -23,6 +23,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
/**
* Utility class that manages the mapping of Lease objects/operations to records in DynamoDB.
@ -104,6 +105,15 @@ public interface ILeaseSerializer<T extends Lease> {
*/
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(T lease);
/**
* @param lease
* @param updateField
* @return the attribute value map that updates application-specific data for a lease
*/
default Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(T lease, UpdateField updateField) {
throw new UnsupportedOperationException();
}
/**
* @return the key schema for creating a DynamoDB table to store leases
*/

View file

@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Arrays;
import java.util.List;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -53,6 +54,7 @@ class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease>
DELETELEASE(9),
DELETEALL(10),
UPDATELEASE(11),
UPDATELEASEWITHMETAINFO(12),
NONE(Integer.MIN_VALUE);
private Integer index;
@ -197,6 +199,14 @@ class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease>
return leaseManager.updateLease(lease);
}
@Override
public void updateLeaseWithMetaInfo(KinesisClientLease lease, UpdateField updateField)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throwExceptions("updateLeaseWithMetaInfo", ExceptionThrowingLeaseManagerMethods.UPDATELEASEWITHMETAINFO);
leaseManager.updateLeaseWithMetaInfo(lease, updateField);
}
@Override
public KinesisClientLease getLease(String shardId)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {

View file

@ -0,0 +1,613 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.util.CollectionUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MAX_HASH_KEY;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MIN_HASH_KEY;
import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.deserialize;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PeriodicShardSyncManagerTest {
private static final String WORKER_ID = "workerId";
/** Manager for PERIODIC shard sync strategy */
private PeriodicShardSyncManager periodicShardSyncManager;
/** Manager for SHARD_END shard sync strategy */
private PeriodicShardSyncManager auditorPeriodicShardSyncManager;
@Mock
private LeaderDecider leaderDecider;
@Mock
private ShardSyncTask shardSyncTask;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private IKinesisProxy kinesisProxy;
private IMetricsFactory metricsFactory = new NullMetricsFactory();
@Before
public void setup() {
periodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask,
metricsFactory, leaseManager, kinesisProxy, false);
auditorPeriodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask,
metricsFactory, leaseManager, kinesisProxy, true);
}
@Test
public void testForFailureWhenHashRangesAreIncomplete() {
List<KinesisClientLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("25", MAX_HASH_KEY.toString())); // Missing interval here
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertTrue(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges).isPresent());
}
@Test
public void testForSuccessWhenHashRangesAreComplete() {
List<KinesisClientLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges).isPresent());
}
@Test
public void testForSuccessWhenUnsortedHashRangesAreComplete() {
List<KinesisClientLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("4", "23"));
add(deserialize("2", "3"));
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("24", MAX_HASH_KEY.toString()));
add(deserialize("6", "23"));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges).isPresent());
}
@Test
public void testForSuccessWhenHashRangesAreCompleteForOverlappingLeasesAtEnd() {
List<KinesisClientLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
add(deserialize("24", "45"));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges).isPresent());
}
@Test
public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() throws Exception {
when(leaseManager.listLeases()).thenReturn(null);
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
@Test
public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() throws Exception {
when(leaseManager.listLeases()).thenReturn(Collections.emptyList());
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
@Test
public void testIfShardSyncIsInitiatedWhenConfidenceFactorIsNotReached() throws Exception {
List<KinesisClientLease> leases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
when(leaseManager.listLeases()).thenReturn(leases);
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
}
@Test
public void testIfShardSyncIsInitiatedWhenConfidenceFactorIsReached() throws Exception {
List<KinesisClientLease> leases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
when(leaseManager.listLeases()).thenReturn(leases);
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
@Test
public void testIfShardSyncIsInitiatedWhenHoleIsDueToShardEnd() throws Exception {
List<KinesisClientLease> leases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("6", "23")); // Introducing hole here through SHARD_END checkpoint
add(deserialize("4", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
if (lease.getHashKeyRange().startingHashKey().toString().equals("4")) {
lease.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
} else {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
}
return lease;
}).collect(Collectors.toList());
when(leaseManager.listLeases()).thenReturn(leases);
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
@Test
public void testIfShardSyncIsInitiatedWhenNoLeasesAreUsedDueToShardEnd() throws Exception {
List<KinesisClientLease> leases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
return lease;
}).collect(Collectors.toList());
when(leaseManager.listLeases()).thenReturn(leases);
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
@Test
public void testIfShardSyncIsInitiatedWhenHoleShifts() throws Exception {
List<KinesisClientLease> leases1 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
when(leaseManager.listLeases()).thenReturn(leases1);
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
List<KinesisClientLease> leases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3")); // Hole between 3 and 5
add(deserialize("5", "23"));
add(deserialize("6", "23"));
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Resetting the holes
when(leaseManager.listLeases()).thenReturn(leases2);
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
@Test
public void testIfShardSyncIsInitiatedWhenHoleShiftsMoreThanOnce() throws Exception {
List<KinesisClientLease> leases1 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
when(leaseManager.listLeases()).thenReturn(leases1);
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
List<KinesisClientLease> leases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3")); // Hole between 3 and 5
add(deserialize("5", "23"));
add(deserialize("6", "23"));
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setHashKeyRange(hashKeyRangeForLease);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Resetting the holes
when(leaseManager.listLeases()).thenReturn(leases2);
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
// Resetting the holes again
when(leaseManager.listLeases()).thenReturn(leases1);
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
@Test
public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSync() throws Exception {
final int[] shardCounter = { 0 };
List<HashKeyRangeForLease> hashKeyRangeForLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "20"));
add(deserialize("21", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}};
List<Shard> kinesisShards = hashKeyRangeForLeases.stream()
.map(hashKeyRange -> new Shard()
.withShardId("shard-" + ++shardCounter[0])
.withHashKeyRange(new HashKeyRange()
.withStartingHashKey(hashKeyRange.serializedStartingHashKey())
.withEndingHashKey(hashKeyRange.serializedEndingHashKey())))
.collect(Collectors.toList());
when(kinesisProxy.getShardList()).thenReturn(kinesisShards);
final int[] leaseCounter = { 0 };
List<KinesisClientLease> leases = hashKeyRangeForLeases.stream()
.map(hashKeyRange -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setLeaseKey("shard-" + ++leaseCounter[0]);
// Setting the hash range only for the last two leases
if (leaseCounter[0] >= 3) {
lease.setHashKeyRange(hashKeyRange);
}
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
when(leaseManager.listLeases()).thenReturn(leases);
// Assert that SHARD_END shard sync should never trigger, but PERIODIC shard sync should always trigger
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
// Assert that all the leases now have hash ranges set
for (KinesisClientLease lease : leases) {
Assert.assertNotNull(lease.getHashKeyRange());
}
}
@Test
public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSyncInHoleScenario() throws Exception {
final int[] shardCounter = { 0 };
List<HashKeyRangeForLease> hashKeyRangeForLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("5", "20")); // Hole between 3 and 5
add(deserialize("21", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}};
List<Shard> kinesisShards = hashKeyRangeForLeases.stream()
.map(hashKeyRange -> new Shard()
.withShardId("shard-" + ++shardCounter[0])
.withHashKeyRange(new HashKeyRange()
.withStartingHashKey(hashKeyRange.serializedStartingHashKey())
.withEndingHashKey(hashKeyRange.serializedEndingHashKey())))
.collect(Collectors.toList());
when(kinesisProxy.getShardList()).thenReturn(kinesisShards);
final int[] leaseCounter = { 0 };
List<KinesisClientLease> leases = hashKeyRangeForLeases.stream()
.map(hashKeyRange -> {
KinesisClientLease lease = new KinesisClientLease();
lease.setLeaseKey("shard-" + ++leaseCounter[0]);
// Setting the hash range only for the last two leases
if (leaseCounter[0] >= 3) {
lease.setHashKeyRange(hashKeyRange);
}
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
when(leaseManager.listLeases()).thenReturn(leases);
// Assert that shard sync should trigger after breaching threshold
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
}
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
// Assert that all the leases now have hash ranges set
for (KinesisClientLease lease : leases) {
Assert.assertNotNull(lease.getHashKeyRange());
}
}
@Test
public void testFor1000DifferentValidSplitHierarchyTreeTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<KinesisClientLease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.SPLIT, maxInitialLeaseCount, false);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(leases).isPresent());
Assert.assertFalse(auditorPeriodicShardSyncManager.hasHoleInLeases(leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidMergeHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<KinesisClientLease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, true);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(leases).isPresent());
Assert.assertFalse(auditorPeriodicShardSyncManager.hasHoleInLeases(leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidReshardHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<KinesisClientLease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, true);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(leases).isPresent());
Assert.assertFalse(auditorPeriodicShardSyncManager.hasHoleInLeases(leases).isPresent());
}
}
private List<KinesisClientLease> generateInitialLeases(int initialShardCount) {
long hashRangeInternalMax = 10000000;
List<KinesisClientLease> initialLeases = new ArrayList<>();
long leaseStartKey = 0;
for (int i = 1; i <= initialShardCount; i++) {
final KinesisClientLease lease = new KinesisClientLease();
long leaseEndKey;
if (i != initialShardCount) {
leaseEndKey = (hashRangeInternalMax / initialShardCount) * i;
lease.setHashKeyRange(deserialize(leaseStartKey + "", leaseEndKey + ""));
} else {
leaseEndKey = 0;
lease.setHashKeyRange(deserialize(leaseStartKey + "", MAX_HASH_KEY.toString()));
}
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
lease.setLeaseKey("shard-" + i);
initialLeases.add(lease);
leaseStartKey = leaseEndKey + 1;
}
return initialLeases;
}
private void reshard(List<KinesisClientLease> initialLeases, int depth, ReshardType reshardType, int leaseCounter,
boolean shouldKeepSomeParentsInProgress) {
for (int i = 0; i < depth; i++) {
if (reshardType == ReshardType.SPLIT) {
leaseCounter = split(initialLeases, leaseCounter);
} else if (reshardType == ReshardType.MERGE) {
leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress);
} else {
if (isHeads()) {
leaseCounter = split(initialLeases, leaseCounter);
} else {
leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress);
}
}
}
}
private int merge(List<KinesisClientLease> initialLeases, int leaseCounter, boolean shouldKeepSomeParentsInProgress) {
List<KinesisClientLease> leasesEligibleForMerge = initialLeases.stream()
.filter(l -> CollectionUtils.isNullOrEmpty(l.getChildShardIds())).collect(Collectors.toList());
int leasesToMerge = (int) ((leasesEligibleForMerge.size() - 1) / 2.0 * Math.random());
for (int i = 0; i < leasesToMerge; i += 2) {
KinesisClientLease parent1 = leasesEligibleForMerge.get(i);
KinesisClientLease parent2 = leasesEligibleForMerge.get(i + 1);
if (parent2.getHashKeyRange().startingHashKey()
.subtract(parent1.getHashKeyRange().endingHashKey()).equals(BigInteger.ONE)) {
parent1.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
if (!shouldKeepSomeParentsInProgress || (shouldKeepSomeParentsInProgress && isOneFromDiceRoll())) {
parent2.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
}
KinesisClientLease child = new KinesisClientLease();
child.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
child.setLeaseKey("shard-" + ++leaseCounter);
child.setHashKeyRange(new HashKeyRangeForLease(parent1.getHashKeyRange().startingHashKey(),
parent2.getHashKeyRange().endingHashKey()));
parent1.setChildShardIds(Collections.singletonList(child.getLeaseKey()));
parent2.setChildShardIds(Collections.singletonList(child.getLeaseKey()));
child.setParentShardIds(Sets.newHashSet(parent1.getLeaseKey(), parent2.getLeaseKey()));
initialLeases.add(child);
}
}
return leaseCounter;
}
private int split(List<KinesisClientLease> initialLeases, int leaseCounter) {
List<KinesisClientLease> leasesEligibleForSplit = initialLeases.stream()
.filter(l -> CollectionUtils.isNullOrEmpty(l.getChildShardIds())).collect(Collectors.toList());
int leasesToSplit = (int) (leasesEligibleForSplit.size() * Math.random());
for (int i = 0; i < leasesToSplit; i++) {
KinesisClientLease parent = leasesEligibleForSplit.get(i);
parent.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
KinesisClientLease child1 = new KinesisClientLease();
child1.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
child1.setHashKeyRange(new HashKeyRangeForLease(parent.getHashKeyRange().startingHashKey(),
parent.getHashKeyRange().startingHashKey().add(parent.getHashKeyRange().endingHashKey())
.divide(new BigInteger("2"))));
child1.setLeaseKey("shard-" + ++leaseCounter);
KinesisClientLease child2 = new KinesisClientLease();
child2.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
child2.setHashKeyRange(new HashKeyRangeForLease(parent.getHashKeyRange().startingHashKey()
.add(parent.getHashKeyRange().endingHashKey()).divide(new BigInteger("2")).add(BigInteger.ONE),
parent.getHashKeyRange().endingHashKey()));
child2.setLeaseKey("shard-" + ++leaseCounter);
child1.setParentShardIds(Sets.newHashSet(parent.getLeaseKey()));
child2.setParentShardIds(Sets.newHashSet(parent.getLeaseKey()));
parent.setChildShardIds(Lists.newArrayList(child1.getLeaseKey(), child2.getLeaseKey()));
initialLeases.add(child1);
initialLeases.add(child2);
}
return leaseCounter;
}
private boolean isHeads() {
return Math.random() <= 0.5;
}
private boolean isOneFromDiceRoll() {
return Math.random() <= 0.16;
}
private enum ReshardType {
SPLIT,
MERGE,
ANY
}
}

View file

@ -174,6 +174,8 @@ public class WorkerTest {
@Mock
private IKinesisProxy proxy;
@Mock
private StreamConfig streamConfig;
@Mock
private WorkerThreadPoolExecutor executorService;
@Mock
private WorkerCWMetricsFactory cwMetricsFactory;
@ -200,6 +202,8 @@ public class WorkerTest {
config.withMaxInitializationAttempts(1);
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(leaseCoordinator.getLeaseManager()).thenReturn(mock(ILeaseManager.class));
when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy);
}
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
@ -257,7 +261,6 @@ public class WorkerTest {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
int idleTimeInMilliseconds = 1000;
@ -306,7 +309,6 @@ public class WorkerTest {
public void testWorkerLoopWithCheckpoint() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
int idleTimeInMilliseconds = 1000;
@ -376,7 +378,6 @@ public class WorkerTest {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
int idleTimeInMilliseconds = 1000;
@ -866,7 +867,6 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
@ -954,7 +954,6 @@ public class WorkerTest {
public void testShutdownCallableNotAllowedTwice() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
@ -1019,7 +1018,6 @@ public class WorkerTest {
public void testGracefulShutdownSingleFuture() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
@ -1107,7 +1105,6 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1190,7 +1187,6 @@ public class WorkerTest {
when(completedLease.getParentShardIds()).thenReturn(Collections.singleton(parentShardId));
when(completedLease.getCheckpoint()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(completedLease.getConcurrencyToken()).thenReturn(UUID.randomUUID());
final StreamConfig streamConfig = mock(StreamConfig.class);
final IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
final List<KinesisClientLease> leases = Collections.singletonList(completedLease);
final List<ShardInfo> currentAssignments = new ArrayList<>();
@ -1238,7 +1234,6 @@ public class WorkerTest {
public void testRequestShutdownWithLostLease() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
@ -1351,7 +1346,6 @@ public class WorkerTest {
public void testRequestShutdownWithAllLeasesLost() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
@ -1469,7 +1463,6 @@ public class WorkerTest {
public void testLeaseCancelledAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
@ -1553,7 +1546,6 @@ public class WorkerTest {
public void testEndOfShardAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);

View file

@ -30,6 +30,8 @@ public class KinesisClientLeaseBuilder {
private ExtendedSequenceNumber pendingCheckpoint;
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>();
private HashKeyRangeForLease hashKeyRangeForLease;
public KinesisClientLeaseBuilder withLeaseKey(String leaseKey) {
this.leaseKey = leaseKey;
@ -76,8 +78,19 @@ public class KinesisClientLeaseBuilder {
return this;
}
public KinesisClientLeaseBuilder withChildShardIds(Set<String> childShardIds) {
this.childShardIds = childShardIds;
return this;
}
public KinesisClientLeaseBuilder withHashKeyRange(HashKeyRangeForLease hashKeyRangeForLease) {
this.hashKeyRangeForLease = hashKeyRangeForLease;
return this;
}
public KinesisClientLease build() {
return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds);
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds,
hashKeyRangeForLease);
}
}