Logic to auto fill the missing hashranges and lease recovery. Added more unit test cases

This commit is contained in:
Ashwin Giridharan 2020-05-07 01:28:56 -07:00
parent 9e97edd273
commit 9115f2000b
11 changed files with 495 additions and 36 deletions

View file

@ -21,6 +21,7 @@ import lombok.NonNull;
import lombok.Value; import lombok.Value;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
@ -28,7 +29,9 @@ import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -50,6 +53,8 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;
/** /**
* The top level orchestrator for coordinating the periodic shard sync related * The top level orchestrator for coordinating the periodic shard sync related
* activities. * activities.
@ -59,10 +64,13 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
class PeriodicShardSyncManager { class PeriodicShardSyncManager {
private static final long INITIAL_DELAY = 60 * 1000L; private static final long INITIAL_DELAY = 60 * 1000L;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
private static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; @VisibleForTesting
private static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
private static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; @VisibleForTesting
static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
@VisibleForTesting
static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3;
private Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>(); private Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>();
private final String workerId; private final String workerId;
@ -126,7 +134,7 @@ class PeriodicShardSyncManager {
log.info("Syncing Kinesis shard info for " + streamIdentifier); log.info("Syncing Kinesis shard info for " + streamIdentifier);
final StreamConfig streamConfig = streamConfigEntry.getValue(); final StreamConfig streamConfig = streamConfigEntry.getValue();
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig); final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig);
final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask(); final TaskResult taskResult = shardSyncTaskManager.callShardSyncTask();
if (taskResult.getException() != null) { if (taskResult.getException() != null) {
throw taskResult.getException(); throw taskResult.getException();
} }
@ -145,19 +153,30 @@ class PeriodicShardSyncManager {
private void runShardSync() { private void runShardSync() {
if (leaderDecider.isLeader(workerId)) { if (leaderDecider.isLeader(workerId)) {
log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId));
try { try {
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(currentStreamConfigMap.keySet()); // Construct the stream to leases map to be used in the lease sync
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) { final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(
currentStreamConfigMap.keySet());
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); // For each of the stream, check if shard sync needs to be done based on the leases state.
if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
if (shouldDoShardSync(streamConfigEntry.getKey(),
streamToLeasesMap.get(streamConfigEntry.getKey()))) {
log.info("Periodic shard syncer initiating shard sync for {}", streamConfigEntry.getKey());
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider
.apply(streamConfigEntry.getValue());
if (!shardSyncTaskManager.castShardSyncTask()) {
log.warn( log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", "Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
} }
} else {
log.info("Skipping shard sync for {} as either hash ranges are complete in the lease table or leases hole confidence is not achieved.", streamConfigEntry.getKey());
}
} }
} catch (Exception e) { } catch (Exception e) {
// TODO : Log log.error("Caught exception while running periodic shard syncer.", e);
} }
} else { } else {
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
@ -184,10 +203,12 @@ class PeriodicShardSyncManager {
} }
} }
// TODO : Catch exception @VisibleForTesting
private boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List<Lease> leases) { boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List<Lease> leases) {
if (CollectionUtils.isNullOrEmpty(leases)) { if (CollectionUtils.isNullOrEmpty(leases)) {
throw new IllegalArgumentException("No leases found to validate for the stream " + streamIdentifier); // If the leases is null or empty then we need to do shard sync
log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier);
return true;
} }
// Check if there are any holes in the leases and return the first hole if present. // Check if there are any holes in the leases and return the first hole if present.
Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases);
@ -205,25 +226,76 @@ class PeriodicShardSyncManager {
} }
private Optional<HashRangeHole> hasHoleInLeases(StreamIdentifier streamIdentifier, List<Lease> leases) { private Optional<HashRangeHole> hasHoleInLeases(StreamIdentifier streamIdentifier, List<Lease> leases) {
// Filter the hashranges of leases which has any checkpoint other than shard end. // Filter the leases with any checkpoint other than shard end.
List<HashKeyRangeForLease> hashRangesForActiveLeases = leases.stream() List<Lease> activeLeases = leases.stream()
.filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()) .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList());
List<Lease> activeLeasesWithHashRanges = fillWithHashRangesIfRequired(streamIdentifier, activeLeases);
List<HashKeyRangeForLease> hashRangesForActiveLeases = activeLeasesWithHashRanges.stream()
.map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList()); .map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList());
return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY); return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY);
} }
// If leases are missing hashranges information, update the leases in-memory as well as in the lease storage
// by learning from kinesis shards.
private List<Lease> fillWithHashRangesIfRequired(StreamIdentifier streamIdentifier, List<Lease> activeLeases) {
List<Lease> activeLeasesWithNoHashRanges = activeLeases.stream()
.filter(lease -> lease.hashKeyRangeForLease() == null).collect(Collectors.toList());
Optional<Lease> minLeaseOpt = activeLeasesWithNoHashRanges.stream().min(Comparator.comparing(Lease::leaseKey));
if (minLeaseOpt.isPresent()) {
// TODO : use minLease for new ListShards with startingShardId
final Lease minLease = minLeaseOpt.get();
final ShardDetector shardDetector = shardSyncTaskManagerProvider
.apply(currentStreamConfigMap.get(streamIdentifier)).shardDetector();
final Map<String, Shard> kinesisShards = shardDetector.listShards().stream()
.collect(Collectors.toMap(Shard::shardId, shard -> shard));
return activeLeases.stream().map(lease -> {
if (lease.hashKeyRangeForLease() == null) {
final String shardId = lease instanceof MultiStreamLease ?
((MultiStreamLease) lease).shardId() :
lease.leaseKey();
final Shard shard = kinesisShards.get(shardId);
if(shard == null) {
return lease;
}
lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange()));
try {
leaseRefresher.updateLease(lease, UpdateField.HASH_KEY_RANGE);
} catch (Exception e) {
log.warn(
"Unable to update hash range key information for lease {} of stream {}. This may result in explicit lease sync.",
lease.leaseKey(), streamIdentifier);
}
}
return lease;
}).filter(lease -> lease.hashKeyRangeForLease() != null).collect(Collectors.toList());
} else {
return activeLeases;
}
}
@VisibleForTesting @VisibleForTesting
static Optional<HashRangeHole> checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier, static Optional<HashRangeHole> checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier,
List<HashKeyRangeForLease> hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) { List<HashKeyRangeForLease> hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) {
// Sort and merge the overlapping hash ranges.
List<HashKeyRangeForLease> mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); List<HashKeyRangeForLease> mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges);
if(mergedHashKeyRanges.isEmpty()) {
log.error("No valid hashranges found for stream {} between {} and {}.", streamIdentifier,
MIN_HASH_KEY, MAX_HASH_KEY);
return Optional.of(new HashRangeHole(new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY),
new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY)));
}
// Validate for hashranges bounds.
if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges
.get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) { .get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) {
log.error("Incomplete hash range found between {} and {}.", mergedHashKeyRanges.get(0), log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier,
mergedHashKeyRanges.get(0),
mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)); mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1));
return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0), return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0),
mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1))); mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)));
} }
// Check for any holes in the sorted hashrange intervals.
if (mergedHashKeyRanges.size() > 1) { if (mergedHashKeyRanges.size() > 1) {
for (int i = 1; i < mergedHashKeyRanges.size(); i++) { for (int i = 1; i < mergedHashKeyRanges.size(); i++) {
final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1); final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1);
@ -232,8 +304,8 @@ class PeriodicShardSyncManager {
final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey(); final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey();
if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) {
log.error("Incomplete hash range found between {} and {}.", hashRangeAtStartOfPossibleHole, log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier,
hashRangeAtEndOfPossibleHole); hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole);
return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole)); return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole));
} }
} }

View file

@ -351,7 +351,7 @@ public class Scheduler implements Runnable {
log.info("LeaseCoordinator is already running. No need to start it."); log.info("LeaseCoordinator is already running. No need to start it.");
} }
log.info("Scheduling periodicShardSync"); log.info("Scheduling periodicShardSync");
// leaderElectedPeriodicShardSyncManager.start(shardSyncTasks); leaderElectedPeriodicShardSyncManager.start();
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
// TODO: Determine if waitUntilHashRangeCovered() is needed. // TODO: Determine if waitUntilHashRangeCovered() is needed.
streamSyncWatch.start(); streamSyncWatch.start();
@ -417,7 +417,7 @@ public class Scheduler implements Runnable {
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt());
final StreamConfig streamConfig = currentStreamConfigMap final StreamConfig streamConfig = currentStreamConfigMap
.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
if (createOrGetShardSyncTaskManager(streamConfig).syncShardAndLeaseInfo()) { if (createOrGetShardSyncTaskManager(streamConfig).castShardSyncTask()) {
log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ",
streamIdentifier.serialize(), completedShard.toString()); streamIdentifier.serialize(), completedShard.toString());
} }
@ -480,7 +480,7 @@ public class Scheduler implements Runnable {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.syncShardAndLeaseInfo(); shardSyncTaskManager.castShardSyncTask();
currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier));
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);
} else { } else {
@ -508,7 +508,7 @@ public class Scheduler implements Runnable {
+ ". Syncing shards of that stream."); + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
currentStreamConfigMap.get(streamIdentifier)); currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.syncShardAndLeaseInfo(); shardSyncTaskManager.castShardSyncTask();
currentSetOfStreamsIter.remove(); currentSetOfStreamsIter.remove();
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);
} }

View file

@ -191,6 +191,21 @@ public interface LeaseRefresher {
boolean updateLease(Lease lease) boolean updateLease(Lease lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException; throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing
* library such as leaseCounter, leaseOwner, or leaseKey.
*
* @return true if update succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
default void updateLease(Lease lease, UpdateField updateField)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throw new UnsupportedOperationException("updateLeaseWithNoExpectation is not implemented");
}
/** /**
* Check (synchronously) if there are any leases in the lease table. * Check (synchronously) if there are any leases in the lease table.
* *

View file

@ -107,6 +107,15 @@ public interface LeaseSerializer {
*/ */
Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease); Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease);
/**
* @param lease
* @param updateField
* @return the attribute value map that updates application-specific data for a lease
*/
default Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease, UpdateField updateField) {
throw new UnsupportedOperationException();
}
/** /**
* @return the key schema for creating a DynamoDB table to store leases * @return the key schema for creating a DynamoDB table to store leases
*/ */
@ -116,4 +125,5 @@ public interface LeaseSerializer {
* @return attribute definitions for creating a DynamoDB table to store leases * @return attribute definitions for creating a DynamoDB table to store leases
*/ */
Collection<AttributeDefinition> getAttributeDefinitions(); Collection<AttributeDefinition> getAttributeDefinitions();
} }

View file

@ -126,7 +126,11 @@ public class ShardSyncTaskManager {
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
} }
public TaskResult executeShardSyncTask() { /**
* Call a ShardSyncTask and return the Task Result.
* @return the Task Result.
*/
public TaskResult callShardSyncTask() {
final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector,
leaseRefresher, leaseRefresher,
initialPositionInStream, initialPositionInStream,
@ -140,7 +144,11 @@ public class ShardSyncTaskManager {
return metricCollectingTask.call(); return metricCollectingTask.call();
} }
public boolean syncShardAndLeaseInfo() { /**
* Cast a ShardSyncTask and return if the casting is successful.
* @return if the casting is successful.
*/
public boolean castShardSyncTask() {
try { try {
lock.lock(); lock.lock();
return checkAndSubmitNextTask(); return checkAndSubmitNextTask();
@ -197,7 +205,7 @@ public class ShardSyncTaskManager {
log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException());
} }
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and // Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) // castShardSyncTask is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in // but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next // shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the

View file

@ -0,0 +1,19 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.leases;
public enum UpdateField {
CHILD_SHARDS, HASH_KEY_RANGE
}

View file

@ -35,6 +35,7 @@ import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -659,6 +660,27 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return true; return true;
} }
@Override
public void updateLease(Lease lease, UpdateField updateField)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
log.debug("Updating lease without expectation {}", lease);
final AWSExceptionManager exceptionManager = createExceptionManager();
Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease))
.attributeUpdates(updates).build();
try {
try {
FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
throw new DependencyException(e);
}
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("update", lease.leaseKey(), e);
}
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View file

@ -36,6 +36,7 @@ import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/** /**
@ -268,6 +269,28 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
return result; return result;
} }
@Override
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease,
UpdateField updateField) {
Map<String, AttributeValueUpdate> result = new HashMap<>();
switch (updateField) {
case CHILD_SHARDS:
if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
}
break;
case HASH_KEY_RANGE:
if (lease.hashKeyRangeForLease() != null) {
result.put(STARTING_HASH_KEY, putUpdate(
DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())));
result.put(ENDING_HASH_KEY, putUpdate(
DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())));
}
break;
}
return result;
}
@Override @Override
public Collection<KeySchemaElement> getKeySchema() { public Collection<KeySchemaElement> getKeySchema() {
List<KeySchemaElement> keySchema = new ArrayList<>(); List<KeySchemaElement> keySchema = new ArrayList<>();

View file

@ -115,7 +115,7 @@ public class ShutdownTask implements ConsumerTask {
// This scenario could happen when customer deletes the stream while leaving the KCL application running. // This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (!CollectionUtils.isNullOrEmpty(childShards)) { if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist(); createLeasesForChildShardsIfNotExist();
updateLeasesForChildShards(); updateLeasesWithChildShards();
} else { } else {
log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo)); log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo));
} }
@ -189,7 +189,7 @@ public class ShutdownTask implements ConsumerTask {
} }
} }
private void updateLeasesForChildShards() private void updateLeasesWithChildShards()
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()); Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());

View file

@ -19,22 +19,56 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize; import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize;
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MAX_HASH_KEY;
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MIN_HASH_KEY;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class PeriodicShardSyncManagerTest { public class PeriodicShardSyncManagerTest {
private StreamIdentifier streamIdentifier;
private PeriodicShardSyncManager periodicShardSyncManager;
@Mock
private LeaderDecider leaderDecider;
@Mock
private LeaseRefresher leaseRefresher;
@Mock
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
@Mock
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
@Before @Before
public void setup() { public void setup() {
streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456");
periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, true);
} }
@Test @Test
@ -136,7 +170,7 @@ public class PeriodicShardSyncManagerTest {
add(deserialize("25", "30")); // Missing interval here add(deserialize("25", "30")); // Missing interval here
}}; }};
Assert.assertTrue(PeriodicShardSyncManager Assert.assertTrue(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent());
} }
@Test @Test
@ -149,7 +183,263 @@ public class PeriodicShardSyncManagerTest {
add(deserialize("24", "30")); add(deserialize("24", "30"));
}}; }};
Assert.assertFalse(PeriodicShardSyncManager Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent());
}
@Test
public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() {
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, null));
}
@Test
public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() {
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, new ArrayList<>()));
}
@Test
public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsNotReached() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)));
}
@Test
public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsReached() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases));
}
@Test
public void testIfShardSyncIsInitiatedWhenHoleIsDueToShardEnd() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23")); // introducing hole here through SHARD_END checkpoint
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
if(lease.hashKeyRangeForLease().startingHashKey().toString().equals("4")) {
lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
} else {
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
}
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases));
}
@Test
public void testIfShardSyncIsInitiatedWhenNoLeasesAreUsedDueToShardEnd() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases));
}
@Test
public void testIfShardSyncIsNotInitiatedWhenHoleShifts() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)));
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3")); // Hole between 3 and 5
add(deserialize("5", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2)));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2));
}
@Test
public void testIfShardSyncIsNotInitiatedWhenHoleShiftsMoreThanOnce() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23")); // Hole between 23 and 25
add(deserialize("25", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)));
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3")); // Hole between 3 and 5
add(deserialize("5", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2)));
// Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases));
}
@Test
public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSync() {
ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class);
ShardDetector shardDetector = mock(ShardDetector.class);
when(shardSyncTaskManagerProvider.apply(any())).thenReturn(shardSyncTaskManager);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
final int[] shardCounter = { 0 };
List<HashKeyRangeForLease> hashKeyRangeForLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("4", "20"));
add(deserialize("21", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}};
List<Shard> kinesisShards = hashKeyRangeForLeases.stream()
.map(hashKeyRangeForLease -> Shard.builder().shardId("shard-" + (++shardCounter[0])).hashKeyRange(
HashKeyRange.builder().startingHashKey(hashKeyRangeForLease.serializedStartingHashKey())
.endingHashKey(hashKeyRangeForLease.serializedEndingHashKey()).build()).build())
.collect(Collectors.toList());
when(shardDetector.listShards()).thenReturn(kinesisShards);
final int[] leaseCounter = { 0 };
List<Lease> multiStreamLeases = hashKeyRangeForLeases.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0])));
lease.shardId("shard-"+(leaseCounter[0]));
// Setting the hashrange only for last two leases
if(leaseCounter[0] >= 3) {
lease.hashKeyRange(hashKeyRangeForLease);
}
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Assert that shard sync should never trigger
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)));
Assert.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases));
// Assert that all the leases now has hashRanges set.
for(Lease lease : multiStreamLeases) {
Assert.assertNotNull(lease.hashKeyRangeForLease());
}
}
@Test
public void testIfMissingHashRangeInformationIsFilledBeforeEvaluatingForShardSyncInHoleScenario() {
ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class);
ShardDetector shardDetector = mock(ShardDetector.class);
when(shardSyncTaskManagerProvider.apply(any())).thenReturn(shardSyncTaskManager);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
final int[] shardCounter = { 0 };
List<HashKeyRangeForLease> hashKeyRangeForLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
add(deserialize("5", "20")); // Hole between 3 and 5
add(deserialize("21", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
}};
List<Shard> kinesisShards = hashKeyRangeForLeases.stream()
.map(hashKeyRangeForLease -> Shard.builder().shardId("shard-" + (++shardCounter[0])).hashKeyRange(
HashKeyRange.builder().startingHashKey(hashKeyRangeForLease.serializedStartingHashKey())
.endingHashKey(hashKeyRangeForLease.serializedEndingHashKey()).build()).build())
.collect(Collectors.toList());
when(shardDetector.listShards()).thenReturn(kinesisShards);
final int[] leaseCounter = { 0 };
List<Lease> multiStreamLeases = hashKeyRangeForLeases.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease();
lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0])));
lease.shardId("shard-"+(leaseCounter[0]));
// Setting the hashrange only for last two leases
if(leaseCounter[0] >= 3) {
lease.hashKeyRange(hashKeyRangeForLease);
}
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
// Assert that shard sync should never trigger
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases));
// Assert that all the leases now has hashRanges set.
for(Lease lease : multiStreamLeases) {
Assert.assertNotNull(lease.hashKeyRangeForLease());
}
} }
} }

View file

@ -190,7 +190,7 @@ public class SchedulerTest {
}); });
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class));
@ -1036,7 +1036,7 @@ public class SchedulerTest {
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier());
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
if(shardSyncFirstAttemptFailure) { if(shardSyncFirstAttemptFailure) {
when(shardDetector.listShards()) when(shardDetector.listShards())
.thenThrow(new RuntimeException("Service Exception")) .thenThrow(new RuntimeException("Service Exception"))