ShardSyncer multistream changes

This commit is contained in:
Ashwin Giridharan 2020-02-19 18:43:45 -08:00
parent 11c0ee7556
commit 9fa0cee97e
7 changed files with 162 additions and 88 deletions

View file

@ -199,7 +199,8 @@ public class Scheduler implements Runnable {
final LeaseSerializer leaseSerializer = this.appStreamTracker.map(mst -> true, sc -> false) ?
new DynamoDBMultiStreamLeaseSerializer() :
new DynamoDBLeaseSerializer();
this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer)
this.leaseCoordinator = this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false))
.createLeaseCoordinator(this.metricsFactory);
this.leaseRefresher = this.leaseCoordinator.leaseRefresher();
@ -220,7 +221,7 @@ public class Scheduler implements Runnable {
// TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName.
// TODO : Pass the immutable map here instead of using mst.streamConfigMap()
this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer)
.leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false))
.createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamName));
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
@ -248,7 +249,8 @@ public class Scheduler implements Runnable {
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : Halo : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
this.hierarchicalShardSyncer = leaseManagementConfig
.hierarchicalShardSyncer(this.appStreamTracker.map(mst -> true, sc -> false));
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
}

View file

@ -1,46 +0,0 @@
package software.amazon.kinesis.leases;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import java.util.Optional;
public class CompositeLeaseKey {
// private static final String LEASE_TOKEN_SEPERATOR = ":";
//
// private String streamName;
//
// @Getter
// private String shardId;
//
// public CompositeLeaseKey(String shardId) {
// this(null, shardId);
// }
//
// public CompositeLeaseKey(String streamName, String shardId) {
// this.streamName = streamName;
// this.shardId = shardId;
// }
//
// public Optional<String> getStreamName() {
// return Optional.ofNullable(streamName);
// }
//
// public String getLeaseKey(boolean isMultiStreamingEnabled) {
// Validate.isTrue(!(isMultiStreamingEnabled && StringUtils.isEmpty(streamName)),
// "Empty stream name found while multiStreaming is enabled");
// return isMultiStreamingEnabled ? StringUtils.joinWith(LEASE_TOKEN_SEPERATOR, streamName, shardId) : shardId;
// }
//
// public static CompositeLeaseKey getLeaseKey(String leaseKey) {
// Validate.notNull(leaseKey);
// String leaseTokens[] = leaseKey.split(LEASE_TOKEN_SEPERATOR);
// Validate.inclusiveBetween(1, 2, leaseTokens.length);
// return leaseTokens.length == 2 ?
// new CompositeLeaseKey(leaseTokens[0], leaseTokens[1]) :
// new CompositeLeaseKey(leaseTokens[0]);
// }
}

View file

@ -25,9 +25,13 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
@ -35,6 +39,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Pair;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -57,6 +62,22 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@KinesisClientInternalApi
public class HierarchicalShardSyncer {
private final boolean isMultiStreamMode;
public HierarchicalShardSyncer() {
isMultiStreamMode = false;
}
public HierarchicalShardSyncer(final boolean isMultiStreamMode) {
this.isMultiStreamMode = isMultiStreamMode;
}
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
(lease, multiStreamArgs) ->
multiStreamArgs.isMultiStreamMode() ?
((MultiStreamLease) lease).shardId() :
lease.leaseKey();
/**
* Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards
* (e.g. at startup, or when we reach end of a shard).
@ -86,7 +107,8 @@ public class HierarchicalShardSyncer {
//Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards)throws DependencyException, InvalidStateException,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards)
throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
if (!CollectionUtils.isNullOrEmpty(latestShards)) {
log.debug("Num shards: {}", latestShards.size());
@ -99,10 +121,12 @@ public class HierarchicalShardSyncer {
if (!ignoreUnexpectedChildShards) {
assertAllParentShardsAreClosed(inconsistentShardIds);
}
final List<Lease> currentLeases = leaseRefresher.listLeases();
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds);
final List<Lease> currentLeases = isMultiStreamMode ?
getLeasesForStream(shardDetector.streamName(), leaseRefresher) :
leaseRefresher.listLeases();
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamName());
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition,
inconsistentShardIds, multiStreamArgs);
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
@ -116,14 +140,36 @@ public class HierarchicalShardSyncer {
}
final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher);
cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs);
if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher);
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
leaseRefresher, multiStreamArgs);
}
}
// CHECKSTYLE:ON CyclomaticComplexity
/** Note: This method has package level access solely for testing purposes.
*
* @param streamName We'll use this stream name to filter leases
* @param leaseRefresher Used to fetch leases
* @return Return list of leases (corresponding to shards) of the specified stream.
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
*/
static List<Lease> getLeasesForStream(String streamName,
LeaseRefresher leaseRefresher)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<Lease> streamLeases = new ArrayList<>();
for (Lease lease : leaseRefresher.listLeases()) {
if (streamName.equals(((MultiStreamLease)lease).streamName())) {
streamLeases.add(lease);
}
}
return streamLeases;
}
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
* and a reshard operation.
* @param inconsistentShardIds
@ -310,12 +356,15 @@ public class HierarchicalShardSyncer {
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds) {
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds,
final MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("Existing lease: {}", lease)).map(Lease::leaseKey).collect(Collectors.toSet());
.peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards);
final Map<String, Boolean> memoizationContext = new HashMap<>();
@ -330,10 +379,12 @@ public class HierarchicalShardSyncer {
log.info("shardId {} is an inconsistent child. Not creating a lease", shardId);
} else {
log.debug("Need to create a lease for shardId {}", shardId);
final Lease newLease = newKCLLease(shard);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamName()) :
newKCLLease(shard);
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
memoizationContext);
memoizationContext, multiStreamArgs);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
@ -371,11 +422,17 @@ public class HierarchicalShardSyncer {
final List<Lease> newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values());
final Comparator<Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards);
shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds) {
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds,
new MultiStreamArgs(false, null));
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
@ -405,7 +462,8 @@ public class HierarchicalShardSyncer {
static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId,
final InitialPositionInStreamExtended initialPosition, final Set<String> shardIdsOfCurrentLeases,
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
final Map<String, Lease> shardIdToLeaseMapOfNewShards, final Map<String, Boolean> memoizationContext) {
final Map<String, Lease> shardIdToLeaseMapOfNewShards, final Map<String, Boolean> memoizationContext,
final MultiStreamArgs multiStreamArgs) {
final Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) {
@ -428,7 +486,7 @@ public class HierarchicalShardSyncer {
// Check if the parent is a descendant, and include its ancestors.
if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards,
memoizationContext)) {
memoizationContext, multiStreamArgs)) {
isDescendant = true;
descendantParentShardIds.add(parentShardId);
log.debug("Parent shard {} is a descendant.", parentShardId);
@ -444,7 +502,10 @@ public class HierarchicalShardSyncer {
log.debug("Need to create a lease for shardId {}", parentShardId);
Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
if (lease == null) {
lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
lease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId),
multiStreamArgs.streamName()) :
newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
}
@ -475,6 +536,16 @@ public class HierarchicalShardSyncer {
memoizationContext.put(shardId, isDescendant);
return isDescendant;
}
static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId,
final InitialPositionInStreamExtended initialPosition, final Set<String> shardIdsOfCurrentLeases,
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
final Map<String, Lease> shardIdToLeaseMapOfNewShards, final Map<String, Boolean> memoizationContext) {
return checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, shardIdsOfCurrentLeases,
shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, memoizationContext,
new MultiStreamArgs(false, null));
}
// CHECKSTYLE:ON CyclomaticComplexity
/**
@ -519,13 +590,14 @@ public class HierarchicalShardSyncer {
* @throws DependencyException
*/
private static void cleanupGarbageLeases(@NonNull final ShardDetector shardDetector, final List<Shard> shards,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher) throws KinesisClientLibIOException,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher,
final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException,
DependencyException, InvalidStateException, ProvisionedThroughputException {
final Set<String> kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet());
// Check if there are leases for non-existent shards
final List<Lease> garbageLeases = trackedLeases.stream()
.filter(lease -> isCandidateForCleanup(lease, kinesisShards)).collect(Collectors.toList());
.filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList());
if (!CollectionUtils.isNullOrEmpty(garbageLeases)) {
log.info("Found {} candidate leases for cleanup. Refreshing list of"
@ -534,7 +606,7 @@ public class HierarchicalShardSyncer {
.collect(Collectors.toSet());
for (Lease lease : garbageLeases) {
if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) {
log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey());
leaseRefresher.deleteLease(lease);
}
@ -552,14 +624,17 @@ public class HierarchicalShardSyncer {
* @throws KinesisClientLibIOException Thrown if currentKinesisShardIds contains a parent shard but not the child
* shard (we are evaluating for deletion).
*/
static boolean isCandidateForCleanup(final Lease lease, final Set<String> currentKinesisShardIds)
static boolean isCandidateForCleanup(final Lease lease, final Set<String> currentKinesisShardIds,
final MultiStreamArgs multiStreamArgs)
throws KinesisClientLibIOException {
boolean isCandidateForCleanup = true;
if (currentKinesisShardIds.contains(lease.leaseKey())) {
final String shardId = shardIdFromLeaseDeducer.apply(lease, multiStreamArgs);
if (currentKinesisShardIds.contains(shardId)) {
isCandidateForCleanup = false;
} else {
log.info("Found lease for non-existent shard: {}. Checking its parent shards", lease.leaseKey());
log.info("Found lease for non-existent shard: {}. Checking its parent shards", shardId);
final Set<String> parentShardIds = lease.parentShardIds();
for (String parentShardId : parentShardIds) {
@ -567,7 +642,7 @@ public class HierarchicalShardSyncer {
// This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
if (currentKinesisShardIds.contains(parentShardId)) {
final String message = String.format("Parent shard %s exists but not the child shard %s",
parentShardId, lease.leaseKey());
parentShardId, shardId);
log.info(message);
throw new KinesisClientLibIOException(message);
}
@ -596,27 +671,28 @@ public class HierarchicalShardSyncer {
*/
private synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher,
final MultiStreamArgs multiStreamArgs) throws DependencyException,
InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
final List<Lease> leasesOfClosedShards = currentLeases.stream()
.filter(lease -> lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END))
.collect(Collectors.toList());
final Set<String> shardIdsOfClosedShards = leasesOfClosedShards.stream().map(Lease::leaseKey)
.collect(Collectors.toSet());
final Set<String> shardIdsOfClosedShards = leasesOfClosedShards.stream()
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)).collect(Collectors.toSet());
if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards);
Comparator<? super Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMap);
shardIdToShardMap, multiStreamArgs);
leasesOfClosedShards.sort(startingSequenceNumberComparator);
final Map<String, Lease> trackedLeaseMap = trackedLeases.stream()
.collect(Collectors.toMap(Lease::leaseKey, Function.identity()));
.collect(Collectors.toMap(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs), Function.identity()));
for (Lease leaseOfClosedShard : leasesOfClosedShards) {
final String closedShardId = leaseOfClosedShard.leaseKey();
final String closedShardId = shardIdFromLeaseDeducer.apply(leaseOfClosedShard, multiStreamArgs);
final Set<String> childShardIds = shardIdToChildShardIdsMap.get(closedShardId);
if (closedShardId != null && !CollectionUtils.isNullOrEmpty(childShardIds)) {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher);
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher, multiStreamArgs);
}
}
}
@ -637,7 +713,7 @@ public class HierarchicalShardSyncer {
* @throws DependencyException
*/
synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher)
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher, final MultiStreamArgs multiStreamArgs)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease leaseForClosedShard = trackedLeases.get(closedShardId);
final List<Lease> childShardLeases = childShardIds.stream().map(trackedLeases::get).filter(Objects::nonNull)
@ -655,7 +731,7 @@ public class HierarchicalShardSyncer {
if (okayToDelete) {
log.info("Deleting lease for shard {} as it has been completely processed and processing of child "
+ "shards has begun.", leaseForClosedShard.leaseKey());
+ "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs));
leaseRefresher.deleteLease(leaseForClosedShard);
}
}
@ -684,6 +760,23 @@ public class HierarchicalShardSyncer {
return newLease;
}
private static Lease newKCLMultiStreamLease(final Shard shard, final String streamName) {
MultiStreamLease newLease = new MultiStreamLease();
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamName, shard.shardId()));
List<String> parentShardIds = new ArrayList<>(2);
if (shard.parentShardId() != null) {
parentShardIds.add(shard.parentShardId());
}
if (shard.adjacentParentShardId() != null) {
parentShardIds.add(shard.adjacentParentShardId());
}
newLease.parentShardIds(parentShardIds);
newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamName(streamName);
newLease.shardId(shard.shardId());
return newLease;
}
/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
*
@ -728,6 +821,7 @@ public class HierarchicalShardSyncer {
private static final long serialVersionUID = 1L;
private final Map<String, Shard> shardIdToShardMap;
private final MultiStreamArgs multiStreamArgs;
/**
* Compares two leases based on the starting sequence number of corresponding shards.
@ -741,8 +835,8 @@ public class HierarchicalShardSyncer {
@Override
public int compare(final Lease lease1, final Lease lease2) {
int result = 0;
final String shardId1 = lease1.leaseKey();
final String shardId2 = lease2.leaseKey();
final String shardId1 = shardIdFromLeaseDeducer.apply(lease1, multiStreamArgs);
final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs);
final Shard shard1 = shardIdToShardMap.get(shardId1);
final Shard shard2 = shardIdToShardMap.get(shardId2);
@ -762,4 +856,11 @@ public class HierarchicalShardSyncer {
}
@Data
@Accessors(fluent = true)
private static class MultiStreamArgs {
private final Boolean isMultiStreamMode;
private final String streamName;
}
}

View file

@ -59,7 +59,7 @@ public class KinesisShardDetector implements ShardDetector {
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
@NonNull @Getter
private final String streamName;
private final long listShardsBackoffTimeInMillis;
private final int maxListShardsRetryAttempts;

View file

@ -262,10 +262,24 @@ public class LeaseManagementConfig {
*/
private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;
private HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer();
private HierarchicalShardSyncer hierarchicalShardSyncer;
private LeaseManagementFactory leaseManagementFactory;
private HierarchicalShardSyncer hierarchicalShardSyncer() {
if(hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer();
}
return hierarchicalShardSyncer;
}
public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) {
if(hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode);
}
return hierarchicalShardSyncer;
}
@Deprecated
public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) {
@ -299,7 +313,7 @@ public class LeaseManagementConfig {
return leaseManagementFactory;
}
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer) {
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) {
if(leaseManagementFactory == null) {
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
dynamoDBClient(),
@ -322,7 +336,7 @@ public class LeaseManagementConfig {
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
hierarchicalShardSyncer(isMultiStreamingMode),
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),

View file

@ -27,4 +27,8 @@ public interface ShardDetector {
List<Shard> listShards();
default String streamName() {
throw new UnsupportedOperationException("StreamName not available");
}
}

View file

@ -33,7 +33,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.CompositeLeaseKey;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;