diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java index 87237e0b..75a747e6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java @@ -14,39 +14,98 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.util.CollectionUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ComparisonChain; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.Accessors; import org.apache.commons.lang3.Validate; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange; + /** - * The top level orchestrator for coordinating the periodic shard sync related - * activities. + * The top level orchestrator for coordinating the periodic shard sync related activities. If the configured + * {@link ShardSyncStrategyType} is PERIODIC, this class will be the main shard sync orchestrator. For non-PERIODIC + * strategies, this class will serve as an internal auditor that periodically checks if the full hash range is covered + * by currently held leases, and initiates a recovery shard sync if not. */ @Getter @EqualsAndHashCode class PeriodicShardSyncManager { private static final Log LOG = LogFactory.getLog(PeriodicShardSyncManager.class); private static final long INITIAL_DELAY = 0; - private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000; + + /** DEFAULT interval is used for PERIODIC {@link ShardSyncStrategyType}. */ + private static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000L; + + /** AUDITOR interval is used for non-PERIODIC {@link ShardSyncStrategyType} auditor processes. */ + private static final long AUDITOR_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; + + /** Parameters for validating hash range completeness when running in auditor mode. */ + @VisibleForTesting + static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; + @VisibleForTesting + static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); + @VisibleForTesting + static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; + private final HashRangeHoleTracker hashRangeHoleTracker = new HashRangeHoleTracker(); private final String workerId; private final LeaderDecider leaderDecider; private final ITask metricsEmittingShardSyncTask; private final ScheduledExecutorService shardSyncThreadPool; + private final ILeaseManager leaseManager; + private final IKinesisProxy kinesisProxy; + private final boolean isAuditorMode; + private final long periodicShardSyncIntervalMillis; private boolean isRunning; - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, IMetricsFactory metricsFactory) { - this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory); + PeriodicShardSyncManager(String workerId, + LeaderDecider leaderDecider, + ShardSyncTask shardSyncTask, + IMetricsFactory metricsFactory, + ILeaseManager leaseManager, + IKinesisProxy kinesisProxy, + boolean isAuditorMode) { + this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory, + leaseManager, kinesisProxy, isAuditorMode); } - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool, IMetricsFactory metricsFactory) { + PeriodicShardSyncManager(String workerId, + LeaderDecider leaderDecider, + ShardSyncTask shardSyncTask, + ScheduledExecutorService shardSyncThreadPool, + IMetricsFactory metricsFactory, + ILeaseManager leaseManager, + IKinesisProxy kinesisProxy, + boolean isAuditorMode) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); @@ -54,6 +113,16 @@ class PeriodicShardSyncManager { this.leaderDecider = leaderDecider; this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); this.shardSyncThreadPool = shardSyncThreadPool; + this.leaseManager = leaseManager; + this.kinesisProxy = kinesisProxy; + this.isAuditorMode = isAuditorMode; + if (isAuditorMode) { + Validate.notNull(this.leaseManager, "LeaseManager is required for non-PERIODIC shard sync strategies."); + Validate.notNull(this.kinesisProxy, "KinesisProxy is required for non-PERIODIC shard sync strategies."); + this.periodicShardSyncIntervalMillis = AUDITOR_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS; + } else { + this.periodicShardSyncIntervalMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS; + } } public synchronized TaskResult start() { @@ -67,7 +136,7 @@ class PeriodicShardSyncManager { }; shardSyncThreadPool - .scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, + .scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, periodicShardSyncIntervalMillis, TimeUnit.MILLISECONDS); isRunning = true; } @@ -95,11 +164,227 @@ class PeriodicShardSyncManager { private void runShardSync() { if (leaderDecider.isLeader(workerId)) { - LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); - metricsEmittingShardSyncTask.call(); + LOG.debug("WorkerId " + workerId + " is a leader, running the shard sync task"); + + try { + final ShardSyncResponse shardSyncResponse = checkForShardSync(); + if (shardSyncResponse.shouldDoShardSync()) { + LOG.info("Periodic shard syncer initiating shard sync due to the reason - " + + shardSyncResponse.reasonForDecision()); + metricsEmittingShardSyncTask.call(); + } else { + LOG.info("Skipping shard sync due to the reason - " + shardSyncResponse.reasonForDecision()); + } + } catch (Exception e) { + LOG.error("Caught exception while running periodic shard syncer.", e); + } } else { - LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); + LOG.debug("WorkerId " + workerId + " is not a leader, not running the shard sync task"); + } + } + + @VisibleForTesting + ShardSyncResponse checkForShardSync() throws DependencyException, InvalidStateException, + ProvisionedThroughputException { + + if (!isAuditorMode) { + // If we are running with PERIODIC shard sync strategy, we should sync every time. + return new ShardSyncResponse(true, "Syncing every time with PERIODIC shard sync strategy."); } + // Get current leases from DynamoDB. + final List currentLeases = leaseManager.listLeases(); + + if (CollectionUtils.isNullOrEmpty(currentLeases)) { + // If the current leases are null or empty, then we need to initiate a shard sync. + LOG.info("No leases found. Will trigger a shard sync."); + return new ShardSyncResponse(true, "No leases found."); + } + + // Check if there are any holes in the hash range covered by current leases. Return the first hole if present. + Optional hashRangeHoleOpt = hasHoleInLeases(currentLeases); + if (hashRangeHoleOpt.isPresent()) { + // If hole is present, check if the hole is detected consecutively in previous occurrences. If hole is + // determined with high confidence, return true; return false otherwise. We use the high confidence factor + // to avoid shard sync on any holes during resharding and lease cleanups, or other intermittent issues. + final boolean hasHoleWithHighConfidence = + hashRangeHoleTracker.hashHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); + + return new ShardSyncResponse(hasHoleWithHighConfidence, + "Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " + + "Will initiate shard sync after reaching threshold: " + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY); + } else { + // If hole is not present, clear any previous hole tracking and return false. + hashRangeHoleTracker.reset(); + return new ShardSyncResponse(false, "Hash range is complete."); + } + } + + @VisibleForTesting + Optional hasHoleInLeases(List leases) { + // Filter out any leases with checkpoints other than SHARD_END + final List activeLeases = leases.stream() + .filter(lease -> lease.getCheckpoint() != null && !lease.getCheckpoint().isShardEnd()) + .collect(Collectors.toList()); + + final List activeLeasesWithHashRanges = fillWithHashRangesIfRequired(activeLeases); + return checkForHoleInHashKeyRanges(activeLeasesWithHashRanges); + } + + private List fillWithHashRangesIfRequired(List activeLeases) { + final List activeLeasesWithNoHashRanges = activeLeases.stream() + .filter(lease -> lease.getHashKeyRange() == null).collect(Collectors.toList()); + + if (activeLeasesWithNoHashRanges.isEmpty()) { + return activeLeases; + } + + // Fetch shards from Kinesis to fill in the in-memory hash ranges + final Map kinesisShards = kinesisProxy.getShardList().stream() + .collect(Collectors.toMap(Shard::getShardId, shard -> shard)); + + return activeLeases.stream().map(lease -> { + if (lease.getHashKeyRange() == null) { + final String shardId = lease.getLeaseKey(); + final Shard shard = kinesisShards.get(shardId); + if (shard == null) { + return lease; + } + lease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange())); + + try { + leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); + } catch (Exception e) { + LOG.warn("Unable to update hash range information for lease " + lease.getLeaseKey() + + ". This may result in explicit lease sync."); + } + } + return lease; + }).filter(lease -> lease.getHashKeyRange() != null).collect(Collectors.toList()); + } + + @VisibleForTesting + static Optional checkForHoleInHashKeyRanges(List leasesWithHashKeyRanges) { + // Sort the hash ranges by starting hash key + final List sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges); + if (sortedLeasesWithHashKeyRanges.isEmpty()) { + LOG.error("No leases with valid hash ranges found."); + return Optional.of(new HashRangeHole()); + } + + // Validate the hash range bounds + final KinesisClientLease minHashKeyLease = sortedLeasesWithHashKeyRanges.get(0); + final KinesisClientLease maxHashKeyLease = + sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1); + if (!minHashKeyLease.getHashKeyRange().startingHashKey().equals(MIN_HASH_KEY) || + !maxHashKeyLease.getHashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) { + LOG.error("Incomplete hash range found between " + minHashKeyLease + " and " + maxHashKeyLease); + return Optional.of(new HashRangeHole(minHashKeyLease.getHashKeyRange(), maxHashKeyLease.getHashKeyRange())); + } + + // Check for any holes in the sorted hash range intervals + if (sortedLeasesWithHashKeyRanges.size() > 1) { + KinesisClientLease leftmostLeaseToReportInCaseOfHole = minHashKeyLease; + HashKeyRangeForLease leftLeaseHashRange = leftmostLeaseToReportInCaseOfHole.getHashKeyRange(); + + for (int i = 1; i < sortedLeasesWithHashKeyRanges.size(); i++) { + final KinesisClientLease rightLease = sortedLeasesWithHashKeyRanges.get(i); + final HashKeyRangeForLease rightLeaseHashRange = rightLease.getHashKeyRange(); + final BigInteger rangeDiff = + rightLeaseHashRange.startingHashKey().subtract(leftLeaseHashRange.endingHashKey()); + // We have overlapping leases when rangeDiff is 0 or negative. + // signum() will be -1 for negative and 0 if value is 0. + // Merge the ranges for further tracking. + if (rangeDiff.signum() <= 0) { + leftLeaseHashRange = new HashKeyRangeForLease(leftLeaseHashRange.startingHashKey(), + leftLeaseHashRange.endingHashKey().max(rightLeaseHashRange.endingHashKey())); + } else { + // We have non-overlapping leases when rangeDiff is positive. signum() will be 1 in this case. + // If rangeDiff is 1, then it is a continuous hash range. If not, there is a hole. + if (!rangeDiff.equals(BigInteger.ONE)) { + LOG.error("Incomplete hash range found between " + leftmostLeaseToReportInCaseOfHole + + " and " + rightLease); + return Optional.of(new HashRangeHole(leftmostLeaseToReportInCaseOfHole.getHashKeyRange(), + rightLease.getHashKeyRange())); + } + + leftmostLeaseToReportInCaseOfHole = rightLease; + leftLeaseHashRange = rightLeaseHashRange; + } + } + } + + return Optional.empty(); + } + + @VisibleForTesting + static List sortLeasesByHashRange(List leasesWithHashKeyRanges) { + if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) { + return leasesWithHashKeyRanges; + } + Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator()); + return leasesWithHashKeyRanges; + } + + @Value + @Accessors(fluent = true) + @VisibleForTesting + static class ShardSyncResponse { + private final boolean shouldDoShardSync; + private final String reasonForDecision; + } + + @Value + private static class HashRangeHole { + private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; + private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; + + HashRangeHole() { + hashRangeAtStartOfPossibleHole = hashRangeAtEndOfPossibleHole = null; + } + + HashRangeHole(HashKeyRangeForLease hashRangeAtStartOfPossibleHole, + HashKeyRangeForLease hashRangeAtEndOfPossibleHole) { + this.hashRangeAtStartOfPossibleHole = hashRangeAtStartOfPossibleHole; + this.hashRangeAtEndOfPossibleHole = hashRangeAtEndOfPossibleHole; + } + } + + private static class HashRangeHoleTracker { + private HashRangeHole hashRangeHole; + @Getter + private Integer numConsecutiveHoles; + + public boolean hashHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) { + if (hashRangeHole.equals(this.hashRangeHole)) { + ++this.numConsecutiveHoles; + } else { + this.hashRangeHole = hashRangeHole; + this.numConsecutiveHoles = 1; + } + + return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; + } + + public void reset() { + this.hashRangeHole = null; + this.numConsecutiveHoles = 0; + } + } + + private static class HashKeyRangeComparator implements Comparator, Serializable { + private static final long serialVersionUID = 1L; + + @Override + public int compare(KinesisClientLease lease, KinesisClientLease otherLease) { + Validate.notNull(lease); + Validate.notNull(otherLease); + Validate.notNull(lease.getHashKeyRange()); + Validate.notNull(otherLease.getHashKeyRange()); + return ComparisonChain.start() + .compare(lease.getHashKeyRange().startingHashKey(), otherLease.getHashKeyRange().startingHashKey()) + .compare(lease.getHashKeyRange().endingHashKey(), otherLease.getHashKeyRange().endingHashKey()) + .result(); + } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index a86771e3..9cdb71b5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -577,8 +577,9 @@ public class Worker implements Runnable { /** * Create shard sync strategy and corresponding {@link LeaderDecider} based on provided configs. PERIODIC - * {@link ShardSyncStrategyType} honors custom leaderDeciders for leader election strategy. All other - * {@link ShardSyncStrategyType}s permit only a default single-leader strategy. + * {@link ShardSyncStrategyType} honors custom leaderDeciders for leader election strategy, and does not permit + * skipping shard syncs if the hash range is complete. All other {@link ShardSyncStrategyType}s permit only a + * default single-leader strategy, and will skip shard syncs unless a hole in the hash range is detected. */ private void createShardSyncStrategy(ShardSyncStrategyType strategyType, LeaderDecider leaderDecider, @@ -587,7 +588,7 @@ public class Worker implements Runnable { case PERIODIC: this.leaderDecider = getOrCreateLeaderDecider(leaderDecider); this.leaderElectedPeriodicShardSyncManager = - getOrCreatePeriodicShardSyncManager(periodicShardSyncManager); + getOrCreatePeriodicShardSyncManager(periodicShardSyncManager, false); this.shardSyncStrategy = createPeriodicShardSyncStrategy(); break; case SHARD_END: @@ -598,7 +599,7 @@ public class Worker implements Runnable { } this.leaderDecider = getOrCreateLeaderDecider(null); this.leaderElectedPeriodicShardSyncManager = - getOrCreatePeriodicShardSyncManager(periodicShardSyncManager); + getOrCreatePeriodicShardSyncManager(periodicShardSyncManager, true); this.shardSyncStrategy = createShardEndShardSyncStrategy(); } @@ -1255,9 +1256,10 @@ public class Worker implements Runnable { Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); } - private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager) { - // TODO: Configure periodicShardSyncManager with either mandatory shard sync (PERIODIC) or hash range - // validation based shard sync (SHARD_END) based on configured shard sync strategy + /** A non-null PeriodicShardSyncManager can only provided from unit tests. Any application code will create the + * PeriodicShardSyncManager for the first time here. */ + private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager, + boolean isAuditorMode) { if (periodicShardSyncManager != null) { return periodicShardSyncManager; } @@ -1272,7 +1274,10 @@ public class Worker implements Runnable { SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, shardSyncer, null), - metricsFactory); + metricsFactory, + leaseCoordinator.getLeaseManager(), + streamConfig.getStreamProxy(), + isAuditorMode); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java index e817e0ea..92d1e71a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java @@ -141,6 +141,10 @@ public class ExtendedSequenceNumber implements Comparable parentShardIds = new HashSet(); + private Set childShardIds = new HashSet(); + private HashKeyRangeForLease hashKeyRangeForLease; public KinesisClientLease() { @@ -41,17 +43,22 @@ public class KinesisClientLease extends Lease { this.pendingCheckpoint = other.getPendingCheckpoint(); this.ownerSwitchesSinceCheckpoint = other.getOwnerSwitchesSinceCheckpoint(); this.parentShardIds.addAll(other.getParentShardIds()); + this.childShardIds = other.getChildShardIds(); + this.hashKeyRangeForLease = other.getHashKeyRange(); } KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken, Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, ExtendedSequenceNumber pendingCheckpoint, - Long ownerSwitchesSinceCheckpoint, Set parentShardIds) { + Long ownerSwitchesSinceCheckpoint, Set parentShardIds, Set childShardIds, + HashKeyRangeForLease hashKeyRangeForLease) { super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos); this.checkpoint = checkpoint; this.pendingCheckpoint = pendingCheckpoint; this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint; this.parentShardIds.addAll(parentShardIds); + this.childShardIds.addAll(childShardIds); + this.hashKeyRangeForLease = hashKeyRangeForLease; } /** @@ -100,6 +107,20 @@ public class KinesisClientLease extends Lease { return new HashSet(parentShardIds); } + /** + * @return shardIds that are the children of this lease. Used for resharding. + */ + public Set getChildShardIds() { + return new HashSet(childShardIds); + } + + /** + * @return hash key range that this lease covers. + */ + public HashKeyRangeForLease getHashKeyRange() { + return hashKeyRangeForLease; + } + /** * Sets checkpoint. * @@ -142,6 +163,29 @@ public class KinesisClientLease extends Lease { this.parentShardIds.clear(); this.parentShardIds.addAll(parentShardIds); } + + /** + * Sets childShardIds. + * + * @param childShardIds may not be null + */ + public void setChildShardIds(Collection childShardIds) { + verifyNotNull(childShardIds, "childShardIds should not be null"); + + this.childShardIds.clear(); + this.childShardIds.addAll(childShardIds); + } + + /** + * Sets hashKeyRangeForLease. + * + * @param hashKeyRangeForLease may not be null + */ + public void setHashKeyRange(HashKeyRangeForLease hashKeyRangeForLease) { + verifyNotNull(hashKeyRangeForLease, "hashKeyRangeForLease should not be null"); + + this.hashKeyRangeForLease = hashKeyRangeForLease; + } private void verifyNotNull(Object object, String message) { if (object == null) { diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 4006e052..6bf9bc58 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.leases.impl; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import com.amazonaws.services.dynamodbv2.model.AttributeAction; @@ -26,8 +27,11 @@ import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseSerializer; import com.amazonaws.services.kinesis.leases.util.DynamoUtils; +import com.amazonaws.util.CollectionUtils; import com.google.common.base.Strings; +import static com.amazonaws.services.kinesis.leases.impl.UpdateField.HASH_KEY_RANGE; + /** * An implementation of ILeaseSerializer for KinesisClientLease objects. */ @@ -39,6 +43,9 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer getDynamoUpdateLeaseUpdate(KinesisClientLease lease, + UpdateField updateField) { + Map result = new HashMap<>(); + + switch (updateField) { + case CHILD_SHARDS: + // TODO: Implement update fields for child shards + break; + case HASH_KEY_RANGE: + if (lease.getHashKeyRange() != null) { + result.put(STARTING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue( + lease.getHashKeyRange().serializedStartingHashKey()), AttributeAction.PUT)); + result.put(ENDING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue( + lease.getHashKeyRange().serializedEndingHashKey()), AttributeAction.PUT)); + } + break; + } + + return result; + } + @Override public Collection getKeySchema() { return baseSerializer.getKeySchema(); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index 1e747235..7fe4551a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -590,6 +590,35 @@ public class LeaseManager implements ILeaseManager { return true; } + /** + * {@inheritDoc} + */ + @Override + public void updateLeaseWithMetaInfo(T lease, UpdateField updateField) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + verifyNotNull(lease, "lease cannot be null"); + verifyNotNull(updateField, "updateField cannot be null"); + + if (LOG.isDebugEnabled()) { + LOG.debug("Updating lease " + lease + " for field " + updateField); + } + + UpdateItemRequest request = new UpdateItemRequest(); + request.setTableName(table); + request.setKey(serializer.getDynamoHashKey(lease)); + request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); + + Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); + updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); + request.setAttributeUpdates(updates); + + try { + dynamoDBClient.updateItem(request); + } catch (AmazonClientException e) { + throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e); + } + } + /* * This method contains boilerplate exception handling - it throws or returns something to be thrown. The * inconsistency there exists to satisfy the compiler when this method is used at the end of non-void methods. diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java index 46c45c24..b02ed34c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java @@ -177,6 +177,12 @@ public class LeaseSerializer implements ILeaseSerializer { return new HashMap(); } + @Override + public Map getDynamoUpdateLeaseUpdate(Lease lease, UpdateField updateField) { + // There is no application-specific data in Lease - just return a map that increments the counter. + return new HashMap(); + } + @Override public Collection getKeySchema() { List keySchema = new ArrayList(); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/UpdateField.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/UpdateField.java new file mode 100644 index 00000000..d621999b --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/UpdateField.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.leases.impl; + +/** + * These are the special fields that will be updated only once during the lifetime of the lease. + * Since these are meta information that will not affect lease ownership or data durability, we allow + * any elected leader or worker to set these fields directly without any conditional checks. + * Note that though HASH_KEY_RANGE will be available during lease initialization in newer versions, we keep this + * for backfilling while rolling forward to newer versions. + */ +public enum UpdateField { + CHILD_SHARDS, HASH_KEY_RANGE +} diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java index 4de54607..1b63bd01 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java @@ -20,6 +20,7 @@ import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.impl.Lease; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; /** * Supports basic CRUD operations for Leases. @@ -180,6 +181,19 @@ public interface ILeaseManager { public boolean updateLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing + * library such as leaseCounter, leaseOwner, or leaseKey. + ** + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity + * @throws DependencyException if DynamoDB update fails in an unexpected way + */ + default void updateLeaseWithMetaInfo(T lease, UpdateField updateField) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + throw new UnsupportedOperationException("updateLeaseWithMetaInfo is not implemented."); + } + /** * Check (synchronously) if there are any leases in the lease table. * diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java index 35a8fc15..58eb6613 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java @@ -23,6 +23,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.kinesis.leases.impl.Lease; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; /** * Utility class that manages the mapping of Lease objects/operations to records in DynamoDB. @@ -104,6 +105,15 @@ public interface ILeaseSerializer { */ public Map getDynamoUpdateLeaseUpdate(T lease); + /** + * @param lease + * @param updateField + * @return the attribute value map that updates application-specific data for a lease + */ + default Map getDynamoUpdateLeaseUpdate(T lease, UpdateField updateField) { + throw new UnsupportedOperationException(); + } + /** * @return the key schema for creating a DynamoDB table to store leases */ diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java index c8e820bb..e7b6c285 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Arrays; import java.util.List; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,6 +54,7 @@ class ExceptionThrowingLeaseManager implements ILeaseManager DELETELEASE(9), DELETEALL(10), UPDATELEASE(11), + UPDATELEASEWITHMETAINFO(12), NONE(Integer.MIN_VALUE); private Integer index; @@ -197,6 +199,14 @@ class ExceptionThrowingLeaseManager implements ILeaseManager return leaseManager.updateLease(lease); } + @Override + public void updateLeaseWithMetaInfo(KinesisClientLease lease, UpdateField updateField) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + throwExceptions("updateLeaseWithMetaInfo", ExceptionThrowingLeaseManagerMethods.UPDATELEASEWITHMETAINFO); + + leaseManager.updateLeaseWithMetaInfo(lease, updateField); + } + @Override public KinesisClientLease getLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java new file mode 100644 index 00000000..fa628b37 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java @@ -0,0 +1,613 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.model.HashKeyRange; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.util.CollectionUtils; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MAX_HASH_KEY; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MIN_HASH_KEY; +import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.deserialize; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class PeriodicShardSyncManagerTest { + + private static final String WORKER_ID = "workerId"; + + /** Manager for PERIODIC shard sync strategy */ + private PeriodicShardSyncManager periodicShardSyncManager; + + /** Manager for SHARD_END shard sync strategy */ + private PeriodicShardSyncManager auditorPeriodicShardSyncManager; + + @Mock + private LeaderDecider leaderDecider; + @Mock + private ShardSyncTask shardSyncTask; + @Mock + private ILeaseManager leaseManager; + @Mock + private IKinesisProxy kinesisProxy; + + private IMetricsFactory metricsFactory = new NullMetricsFactory(); + + @Before + public void setup() { + periodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, + metricsFactory, leaseManager, kinesisProxy, false); + auditorPeriodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, + metricsFactory, leaseManager, kinesisProxy, true); + } + + @Test + public void testForFailureWhenHashRangesAreIncomplete() { + List hashRanges = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("25", MAX_HASH_KEY.toString())); // Missing interval here + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + Assert.assertTrue(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(hashRanges).isPresent()); + } + + @Test + public void testForSuccessWhenHashRangesAreComplete() { + List hashRanges = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + Assert.assertFalse(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(hashRanges).isPresent()); + } + + @Test + public void testForSuccessWhenUnsortedHashRangesAreComplete() { + List hashRanges = new ArrayList() {{ + add(deserialize("4", "23")); + add(deserialize("2", "3")); + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("24", MAX_HASH_KEY.toString())); + add(deserialize("6", "23")); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + Assert.assertFalse(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(hashRanges).isPresent()); + } + + @Test + public void testForSuccessWhenHashRangesAreCompleteForOverlappingLeasesAtEnd() { + List hashRanges = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + add(deserialize("24", "45")); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + Assert.assertFalse(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(hashRanges).isPresent()); + } + + @Test + public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() throws Exception { + when(leaseManager.listLeases()).thenReturn(null); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + + } + + @Test + public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() throws Exception { + when(leaseManager.listLeases()).thenReturn(Collections.emptyList()); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + + @Test + public void testIfShardSyncIsInitiatedWhenConfidenceFactorIsNotReached() throws Exception { + List leases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + when(leaseManager.listLeases()).thenReturn(leases); + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + } + + @Test + public void testIfShardSyncIsInitiatedWhenConfidenceFactorIsReached() throws Exception { + List leases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + when(leaseManager.listLeases()).thenReturn(leases); + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + + @Test + public void testIfShardSyncIsInitiatedWhenHoleIsDueToShardEnd() throws Exception { + List leases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("6", "23")); // Introducing hole here through SHARD_END checkpoint + add(deserialize("4", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + if (lease.getHashKeyRange().startingHashKey().toString().equals("4")) { + lease.setCheckpoint(ExtendedSequenceNumber.SHARD_END); + } else { + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } + return lease; + }).collect(Collectors.toList()); + + when(leaseManager.listLeases()).thenReturn(leases); + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + + @Test + public void testIfShardSyncIsInitiatedWhenNoLeasesAreUsedDueToShardEnd() throws Exception { + List leases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.SHARD_END); + return lease; + }).collect(Collectors.toList()); + + when(leaseManager.listLeases()).thenReturn(leases); + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + + @Test + public void testIfShardSyncIsInitiatedWhenHoleShifts() throws Exception { + List leases1 = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + when(leaseManager.listLeases()).thenReturn(leases1); + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + + List leases2 = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); // Hole between 3 and 5 + add(deserialize("5", "23")); + add(deserialize("6", "23")); + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + // Resetting the holes + when(leaseManager.listLeases()).thenReturn(leases2); + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + + @Test + public void testIfShardSyncIsInitiatedWhenHoleShiftsMoreThanOnce() throws Exception { + List leases1 = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); // Hole between 23 and 25 + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + when(leaseManager.listLeases()).thenReturn(leases1); + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + + List leases2 = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); // Hole between 3 and 5 + add(deserialize("5", "23")); + add(deserialize("6", "23")); + add(deserialize("25", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setHashKeyRange(hashKeyRangeForLease); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + // Resetting the holes + when(leaseManager.listLeases()).thenReturn(leases2); + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + // Resetting the holes again + when(leaseManager.listLeases()).thenReturn(leases1); + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + + @Test + public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSync() throws Exception { + final int[] shardCounter = { 0 }; + List hashKeyRangeForLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("4", "20")); + add(deserialize("21", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}; + + List kinesisShards = hashKeyRangeForLeases.stream() + .map(hashKeyRange -> new Shard() + .withShardId("shard-" + ++shardCounter[0]) + .withHashKeyRange(new HashKeyRange() + .withStartingHashKey(hashKeyRange.serializedStartingHashKey()) + .withEndingHashKey(hashKeyRange.serializedEndingHashKey()))) + .collect(Collectors.toList()); + + when(kinesisProxy.getShardList()).thenReturn(kinesisShards); + + final int[] leaseCounter = { 0 }; + List leases = hashKeyRangeForLeases.stream() + .map(hashKeyRange -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setLeaseKey("shard-" + ++leaseCounter[0]); + // Setting the hash range only for the last two leases + if (leaseCounter[0] >= 3) { + lease.setHashKeyRange(hashKeyRange); + } + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + when(leaseManager.listLeases()).thenReturn(leases); + + // Assert that SHARD_END shard sync should never trigger, but PERIODIC shard sync should always trigger + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + + // Assert that all the leases now have hash ranges set + for (KinesisClientLease lease : leases) { + Assert.assertNotNull(lease.getHashKeyRange()); + } + } + + @Test + public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSyncInHoleScenario() throws Exception { + final int[] shardCounter = { 0 }; + List hashKeyRangeForLeases = new ArrayList() {{ + add(deserialize(MIN_HASH_KEY.toString(), "1")); + add(deserialize("2", "3")); + add(deserialize("5", "20")); // Hole between 3 and 5 + add(deserialize("21", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + }}; + + List kinesisShards = hashKeyRangeForLeases.stream() + .map(hashKeyRange -> new Shard() + .withShardId("shard-" + ++shardCounter[0]) + .withHashKeyRange(new HashKeyRange() + .withStartingHashKey(hashKeyRange.serializedStartingHashKey()) + .withEndingHashKey(hashKeyRange.serializedEndingHashKey()))) + .collect(Collectors.toList()); + + when(kinesisProxy.getShardList()).thenReturn(kinesisShards); + + final int[] leaseCounter = { 0 }; + List leases = hashKeyRangeForLeases.stream() + .map(hashKeyRange -> { + KinesisClientLease lease = new KinesisClientLease(); + lease.setLeaseKey("shard-" + ++leaseCounter[0]); + // Setting the hash range only for the last two leases + if (leaseCounter[0] >= 3) { + lease.setHashKeyRange(hashKeyRange); + } + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + + when(leaseManager.listLeases()).thenReturn(leases); + + // Assert that shard sync should trigger after breaching threshold + for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + } + Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + Assert.assertTrue(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); + + // Assert that all the leases now have hash ranges set + for (KinesisClientLease lease : leases) { + Assert.assertNotNull(lease.getHashKeyRange()); + } + } + + @Test + public void testFor1000DifferentValidSplitHierarchyTreeTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.SPLIT, maxInitialLeaseCount, false); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(leases).isPresent()); + Assert.assertFalse(auditorPeriodicShardSyncManager.hasHoleInLeases(leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidMergeHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, true); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(leases).isPresent()); + Assert.assertFalse(auditorPeriodicShardSyncManager.hasHoleInLeases(leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidReshardHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, true); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(leases).isPresent()); + Assert.assertFalse(auditorPeriodicShardSyncManager.hasHoleInLeases(leases).isPresent()); + } + } + + private List generateInitialLeases(int initialShardCount) { + long hashRangeInternalMax = 10000000; + List initialLeases = new ArrayList<>(); + long leaseStartKey = 0; + for (int i = 1; i <= initialShardCount; i++) { + final KinesisClientLease lease = new KinesisClientLease(); + long leaseEndKey; + if (i != initialShardCount) { + leaseEndKey = (hashRangeInternalMax / initialShardCount) * i; + lease.setHashKeyRange(deserialize(leaseStartKey + "", leaseEndKey + "")); + } else { + leaseEndKey = 0; + lease.setHashKeyRange(deserialize(leaseStartKey + "", MAX_HASH_KEY.toString())); + } + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + lease.setLeaseKey("shard-" + i); + initialLeases.add(lease); + leaseStartKey = leaseEndKey + 1; + } + + return initialLeases; + } + + private void reshard(List initialLeases, int depth, ReshardType reshardType, int leaseCounter, + boolean shouldKeepSomeParentsInProgress) { + for (int i = 0; i < depth; i++) { + if (reshardType == ReshardType.SPLIT) { + leaseCounter = split(initialLeases, leaseCounter); + } else if (reshardType == ReshardType.MERGE) { + leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress); + } else { + if (isHeads()) { + leaseCounter = split(initialLeases, leaseCounter); + } else { + leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress); + } + } + } + } + + private int merge(List initialLeases, int leaseCounter, boolean shouldKeepSomeParentsInProgress) { + List leasesEligibleForMerge = initialLeases.stream() + .filter(l -> CollectionUtils.isNullOrEmpty(l.getChildShardIds())).collect(Collectors.toList()); + + int leasesToMerge = (int) ((leasesEligibleForMerge.size() - 1) / 2.0 * Math.random()); + for (int i = 0; i < leasesToMerge; i += 2) { + KinesisClientLease parent1 = leasesEligibleForMerge.get(i); + KinesisClientLease parent2 = leasesEligibleForMerge.get(i + 1); + if (parent2.getHashKeyRange().startingHashKey() + .subtract(parent1.getHashKeyRange().endingHashKey()).equals(BigInteger.ONE)) { + parent1.setCheckpoint(ExtendedSequenceNumber.SHARD_END); + + if (!shouldKeepSomeParentsInProgress || (shouldKeepSomeParentsInProgress && isOneFromDiceRoll())) { + parent2.setCheckpoint(ExtendedSequenceNumber.SHARD_END); + } + + KinesisClientLease child = new KinesisClientLease(); + child.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + child.setLeaseKey("shard-" + ++leaseCounter); + child.setHashKeyRange(new HashKeyRangeForLease(parent1.getHashKeyRange().startingHashKey(), + parent2.getHashKeyRange().endingHashKey())); + parent1.setChildShardIds(Collections.singletonList(child.getLeaseKey())); + parent2.setChildShardIds(Collections.singletonList(child.getLeaseKey())); + child.setParentShardIds(Sets.newHashSet(parent1.getLeaseKey(), parent2.getLeaseKey())); + + initialLeases.add(child); + } + } + + return leaseCounter; + } + + private int split(List initialLeases, int leaseCounter) { + List leasesEligibleForSplit = initialLeases.stream() + .filter(l -> CollectionUtils.isNullOrEmpty(l.getChildShardIds())).collect(Collectors.toList()); + + int leasesToSplit = (int) (leasesEligibleForSplit.size() * Math.random()); + for (int i = 0; i < leasesToSplit; i++) { + KinesisClientLease parent = leasesEligibleForSplit.get(i); + parent.setCheckpoint(ExtendedSequenceNumber.SHARD_END); + + KinesisClientLease child1 = new KinesisClientLease(); + child1.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + child1.setHashKeyRange(new HashKeyRangeForLease(parent.getHashKeyRange().startingHashKey(), + parent.getHashKeyRange().startingHashKey().add(parent.getHashKeyRange().endingHashKey()) + .divide(new BigInteger("2")))); + child1.setLeaseKey("shard-" + ++leaseCounter); + + KinesisClientLease child2 = new KinesisClientLease(); + child2.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + child2.setHashKeyRange(new HashKeyRangeForLease(parent.getHashKeyRange().startingHashKey() + .add(parent.getHashKeyRange().endingHashKey()).divide(new BigInteger("2")).add(BigInteger.ONE), + parent.getHashKeyRange().endingHashKey())); + child2.setLeaseKey("shard-" + ++leaseCounter); + + child1.setParentShardIds(Sets.newHashSet(parent.getLeaseKey())); + child2.setParentShardIds(Sets.newHashSet(parent.getLeaseKey())); + parent.setChildShardIds(Lists.newArrayList(child1.getLeaseKey(), child2.getLeaseKey())); + + initialLeases.add(child1); + initialLeases.add(child2); + } + + return leaseCounter; + } + + private boolean isHeads() { + return Math.random() <= 0.5; + } + + private boolean isOneFromDiceRoll() { + return Math.random() <= 0.16; + } + + private enum ReshardType { + SPLIT, + MERGE, + ANY + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index f6bfdd73..b7e1dd51 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -174,6 +174,8 @@ public class WorkerTest { @Mock private IKinesisProxy proxy; @Mock + private StreamConfig streamConfig; + @Mock private WorkerThreadPoolExecutor executorService; @Mock private WorkerCWMetricsFactory cwMetricsFactory; @@ -200,6 +202,8 @@ public class WorkerTest { config.withMaxInitializationAttempts(1); recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + when(leaseCoordinator.getLeaseManager()).thenReturn(mock(ILeaseManager.class)); + when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy); } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES @@ -257,7 +261,6 @@ public class WorkerTest { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID); - IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; int idleTimeInMilliseconds = 1000; @@ -306,7 +309,6 @@ public class WorkerTest { public void testWorkerLoopWithCheckpoint() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; - IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; int idleTimeInMilliseconds = 1000; @@ -376,7 +378,6 @@ public class WorkerTest { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID); - IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; int idleTimeInMilliseconds = 1000; @@ -866,7 +867,6 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); @@ -954,7 +954,6 @@ public class WorkerTest { public void testShutdownCallableNotAllowedTwice() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); @@ -1019,7 +1018,6 @@ public class WorkerTest { public void testGracefulShutdownSingleFuture() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); @@ -1107,7 +1105,6 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1190,7 +1187,6 @@ public class WorkerTest { when(completedLease.getParentShardIds()).thenReturn(Collections.singleton(parentShardId)); when(completedLease.getCheckpoint()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(completedLease.getConcurrencyToken()).thenReturn(UUID.randomUUID()); - final StreamConfig streamConfig = mock(StreamConfig.class); final IMetricsFactory metricsFactory = mock(IMetricsFactory.class); final List leases = Collections.singletonList(completedLease); final List currentAssignments = new ArrayList<>(); @@ -1238,7 +1234,6 @@ public class WorkerTest { public void testRequestShutdownWithLostLease() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); @@ -1351,7 +1346,6 @@ public class WorkerTest { public void testRequestShutdownWithAllLeasesLost() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); @@ -1469,7 +1463,6 @@ public class WorkerTest { public void testLeaseCancelledAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); @@ -1553,7 +1546,6 @@ public class WorkerTest { public void testEndOfShardAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java index c029926c..4f4fdbca 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java @@ -30,6 +30,8 @@ public class KinesisClientLeaseBuilder { private ExtendedSequenceNumber pendingCheckpoint; private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); + private Set childShardIds = new HashSet<>(); + private HashKeyRangeForLease hashKeyRangeForLease; public KinesisClientLeaseBuilder withLeaseKey(String leaseKey) { this.leaseKey = leaseKey; @@ -76,8 +78,19 @@ public class KinesisClientLeaseBuilder { return this; } + public KinesisClientLeaseBuilder withChildShardIds(Set childShardIds) { + this.childShardIds = childShardIds; + return this; + } + + public KinesisClientLeaseBuilder withHashKeyRange(HashKeyRangeForLease hashKeyRangeForLease) { + this.hashKeyRangeForLease = hashKeyRangeForLease; + return this; + } + public KinesisClientLease build() { return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, - checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds); + checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, + hashKeyRangeForLease); } } \ No newline at end of file