From 7491087edbee7c3385bd78eb7c91c9b9e887e907 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 19 Feb 2020 18:43:45 -0800 Subject: [PATCH] ShardSyncer multistream changes --- .../amazon/kinesis/coordinator/Scheduler.java | 8 +- .../kinesis/leases/CompositeLeaseKey.java | 46 ----- .../leases/HierarchicalShardSyncer.java | 169 ++++++++++++++---- .../kinesis/leases/KinesisShardDetector.java | 2 +- .../kinesis/leases/LeaseManagementConfig.java | 20 ++- .../amazon/kinesis/leases/ShardDetector.java | 4 + .../dynamodb/DynamoDBLeaseCoordinator.java | 1 - 7 files changed, 162 insertions(+), 88 deletions(-) delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/CompositeLeaseKey.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index f564f47b..9ccf5df5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -199,7 +199,8 @@ public class Scheduler implements Runnable { final LeaseSerializer leaseSerializer = this.appStreamTracker.map(mst -> true, sc -> false) ? new DynamoDBMultiStreamLeaseSerializer() : new DynamoDBLeaseSerializer(); - this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer) + this.leaseCoordinator = this.leaseManagementConfig + .leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false)) .createLeaseCoordinator(this.metricsFactory); this.leaseRefresher = this.leaseCoordinator.leaseRefresher(); @@ -220,7 +221,7 @@ public class Scheduler implements Runnable { // TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName. // TODO : Pass the immutable map here instead of using mst.streamConfigMap() this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig - .leaseManagementFactory(leaseSerializer) + .leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false)) .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamName)); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); @@ -248,7 +249,8 @@ public class Scheduler implements Runnable { this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); // TODO : Halo : Check if this needs to be per stream. - this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); + this.hierarchicalShardSyncer = leaseManagementConfig + .hierarchicalShardSyncer(this.appStreamTracker.map(mst -> true, sc -> false)); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/CompositeLeaseKey.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/CompositeLeaseKey.java deleted file mode 100644 index 7092ceaa..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/CompositeLeaseKey.java +++ /dev/null @@ -1,46 +0,0 @@ -package software.amazon.kinesis.leases; - -import lombok.Getter; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.Validate; - -import java.util.Optional; - -public class CompositeLeaseKey { - -// private static final String LEASE_TOKEN_SEPERATOR = ":"; -// -// private String streamName; -// -// @Getter -// private String shardId; -// -// public CompositeLeaseKey(String shardId) { -// this(null, shardId); -// } -// -// public CompositeLeaseKey(String streamName, String shardId) { -// this.streamName = streamName; -// this.shardId = shardId; -// } -// -// public Optional getStreamName() { -// return Optional.ofNullable(streamName); -// } -// -// public String getLeaseKey(boolean isMultiStreamingEnabled) { -// Validate.isTrue(!(isMultiStreamingEnabled && StringUtils.isEmpty(streamName)), -// "Empty stream name found while multiStreaming is enabled"); -// return isMultiStreamingEnabled ? StringUtils.joinWith(LEASE_TOKEN_SEPERATOR, streamName, shardId) : shardId; -// } -// -// public static CompositeLeaseKey getLeaseKey(String leaseKey) { -// Validate.notNull(leaseKey); -// String leaseTokens[] = leaseKey.split(LEASE_TOKEN_SEPERATOR); -// Validate.inclusiveBetween(1, 2, leaseTokens.length); -// return leaseTokens.length == 2 ? -// new CompositeLeaseKey(leaseTokens[0], leaseTokens[1]) : -// new CompositeLeaseKey(leaseTokens[0]); -// } - -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 578af465..d3365514 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -25,9 +25,13 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.experimental.Accessors; import org.apache.commons.lang3.StringUtils; import lombok.NonNull; @@ -35,6 +39,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.awssdk.utils.Pair; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -57,6 +62,22 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @KinesisClientInternalApi public class HierarchicalShardSyncer { + private final boolean isMultiStreamMode; + + public HierarchicalShardSyncer() { + isMultiStreamMode = false; + } + + public HierarchicalShardSyncer(final boolean isMultiStreamMode) { + this.isMultiStreamMode = isMultiStreamMode; + } + + private static final BiFunction shardIdFromLeaseDeducer = + (lease, multiStreamArgs) -> + multiStreamArgs.isMultiStreamMode() ? + ((MultiStreamLease) lease).shardId() : + lease.leaseKey(); + /** * Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards * (e.g. at startup, or when we reach end of a shard). @@ -86,7 +107,8 @@ public class HierarchicalShardSyncer { //Provide a pre-collcted list of shards to avoid calling ListShards API public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards)throws DependencyException, InvalidStateException, + final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { if (!CollectionUtils.isNullOrEmpty(latestShards)) { log.debug("Num shards: {}", latestShards.size()); @@ -99,10 +121,12 @@ public class HierarchicalShardSyncer { if (!ignoreUnexpectedChildShards) { assertAllParentShardsAreClosed(inconsistentShardIds); } - - final List currentLeases = leaseRefresher.listLeases(); - - final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds); + final List currentLeases = isMultiStreamMode ? + getLeasesForStream(shardDetector.streamName(), leaseRefresher) : + leaseRefresher.listLeases(); + final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamName()); + final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, + inconsistentShardIds, multiStreamArgs); log.debug("Num new leases to create: {}", newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); @@ -116,14 +140,36 @@ public class HierarchicalShardSyncer { } final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher); + cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs); if (cleanupLeasesOfCompletedShards) { - cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher); + cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, + leaseRefresher, multiStreamArgs); } - } + // CHECKSTYLE:ON CyclomaticComplexity + /** Note: This method has package level access solely for testing purposes. + * + * @param streamName We'll use this stream name to filter leases + * @param leaseRefresher Used to fetch leases + * @return Return list of leases (corresponding to shards) of the specified stream. + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + */ + static List getLeasesForStream(String streamName, + LeaseRefresher leaseRefresher) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamLeases = new ArrayList<>(); + for (Lease lease : leaseRefresher.listLeases()) { + if (streamName.equals(((MultiStreamLease)lease).streamName())) { + streamLeases.add(lease); + } + } + return streamLeases; + } + /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls * and a reshard operation. * @param inconsistentShardIds @@ -310,12 +356,15 @@ public class HierarchicalShardSyncer { * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard */ static List determineNewLeasesToCreate(final List shards, final List currentLeases, - final InitialPositionInStreamExtended initialPosition, final Set inconsistentShardIds) { + final InitialPositionInStreamExtended initialPosition, final Set inconsistentShardIds, + final MultiStreamArgs multiStreamArgs) { final Map shardIdToNewLeaseMap = new HashMap<>(); final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); final Set shardIdsOfCurrentLeases = currentLeases.stream() - .peek(lease -> log.debug("Existing lease: {}", lease)).map(Lease::leaseKey).collect(Collectors.toSet()); + .peek(lease -> log.debug("Existing lease: {}", lease)) + .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) + .collect(Collectors.toSet()); final List openShards = getOpenShards(shards); final Map memoizationContext = new HashMap<>(); @@ -330,10 +379,12 @@ public class HierarchicalShardSyncer { log.info("shardId {} is an inconsistent child. Not creating a lease", shardId); } else { log.debug("Need to create a lease for shardId {}", shardId); - final Lease newLease = newKCLLease(shard); + final Lease newLease = multiStreamArgs.isMultiStreamMode() ? + newKCLMultiStreamLease(shard, multiStreamArgs.streamName()) : + newKCLLease(shard); final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, - memoizationContext); + memoizationContext, multiStreamArgs); /** * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the @@ -371,11 +422,17 @@ public class HierarchicalShardSyncer { final List newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values()); final Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( - shardIdToShardMapOfAllKinesisShards); + shardIdToShardMapOfAllKinesisShards, multiStreamArgs); newLeasesToCreate.sort(startingSequenceNumberComparator); return newLeasesToCreate; } + static List determineNewLeasesToCreate(final List shards, final List currentLeases, + final InitialPositionInStreamExtended initialPosition, final Set inconsistentShardIds) { + return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds, + new MultiStreamArgs(false, null)); + } + /** * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. @@ -405,7 +462,8 @@ public class HierarchicalShardSyncer { static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId, final InitialPositionInStreamExtended initialPosition, final Set shardIdsOfCurrentLeases, final Map shardIdToShardMapOfAllKinesisShards, - final Map shardIdToLeaseMapOfNewShards, final Map memoizationContext) { + final Map shardIdToLeaseMapOfNewShards, final Map memoizationContext, + final MultiStreamArgs multiStreamArgs) { final Boolean previousValue = memoizationContext.get(shardId); if (previousValue != null) { @@ -428,7 +486,7 @@ public class HierarchicalShardSyncer { // Check if the parent is a descendant, and include its ancestors. if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, - memoizationContext)) { + memoizationContext, multiStreamArgs)) { isDescendant = true; descendantParentShardIds.add(parentShardId); log.debug("Parent shard {} is a descendant.", parentShardId); @@ -444,7 +502,10 @@ public class HierarchicalShardSyncer { log.debug("Need to create a lease for shardId {}", parentShardId); Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId); if (lease == null) { - lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); + lease = multiStreamArgs.isMultiStreamMode() ? + newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId), + multiStreamArgs.streamName()) : + newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); shardIdToLeaseMapOfNewShards.put(parentShardId, lease); } @@ -475,6 +536,16 @@ public class HierarchicalShardSyncer { memoizationContext.put(shardId, isDescendant); return isDescendant; } + + static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId, + final InitialPositionInStreamExtended initialPosition, final Set shardIdsOfCurrentLeases, + final Map shardIdToShardMapOfAllKinesisShards, + final Map shardIdToLeaseMapOfNewShards, final Map memoizationContext) { + return checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, shardIdsOfCurrentLeases, + shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, memoizationContext, + new MultiStreamArgs(false, null)); + } + // CHECKSTYLE:ON CyclomaticComplexity /** @@ -519,13 +590,14 @@ public class HierarchicalShardSyncer { * @throws DependencyException */ private static void cleanupGarbageLeases(@NonNull final ShardDetector shardDetector, final List shards, - final List trackedLeases, final LeaseRefresher leaseRefresher) throws KinesisClientLibIOException, + final List trackedLeases, final LeaseRefresher leaseRefresher, + final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { final Set kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet()); // Check if there are leases for non-existent shards final List garbageLeases = trackedLeases.stream() - .filter(lease -> isCandidateForCleanup(lease, kinesisShards)).collect(Collectors.toList()); + .filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList()); if (!CollectionUtils.isNullOrEmpty(garbageLeases)) { log.info("Found {} candidate leases for cleanup. Refreshing list of" @@ -534,7 +606,7 @@ public class HierarchicalShardSyncer { .collect(Collectors.toSet()); for (Lease lease : garbageLeases) { - if (isCandidateForCleanup(lease, currentKinesisShardIds)) { + if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) { log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey()); leaseRefresher.deleteLease(lease); } @@ -552,14 +624,17 @@ public class HierarchicalShardSyncer { * @throws KinesisClientLibIOException Thrown if currentKinesisShardIds contains a parent shard but not the child * shard (we are evaluating for deletion). */ - static boolean isCandidateForCleanup(final Lease lease, final Set currentKinesisShardIds) + static boolean isCandidateForCleanup(final Lease lease, final Set currentKinesisShardIds, + final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException { boolean isCandidateForCleanup = true; - if (currentKinesisShardIds.contains(lease.leaseKey())) { + final String shardId = shardIdFromLeaseDeducer.apply(lease, multiStreamArgs); + + if (currentKinesisShardIds.contains(shardId)) { isCandidateForCleanup = false; } else { - log.info("Found lease for non-existent shard: {}. Checking its parent shards", lease.leaseKey()); + log.info("Found lease for non-existent shard: {}. Checking its parent shards", shardId); final Set parentShardIds = lease.parentShardIds(); for (String parentShardId : parentShardIds) { @@ -567,7 +642,7 @@ public class HierarchicalShardSyncer { // This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards. if (currentKinesisShardIds.contains(parentShardId)) { final String message = String.format("Parent shard %s exists but not the child shard %s", - parentShardId, lease.leaseKey()); + parentShardId, shardId); log.info(message); throw new KinesisClientLibIOException(message); } @@ -596,27 +671,28 @@ public class HierarchicalShardSyncer { */ private synchronized void cleanupLeasesOfFinishedShards(final Collection currentLeases, final Map shardIdToShardMap, final Map> shardIdToChildShardIdsMap, - final List trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException, + final List trackedLeases, final LeaseRefresher leaseRefresher, + final MultiStreamArgs multiStreamArgs) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { final List leasesOfClosedShards = currentLeases.stream() .filter(lease -> lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) .collect(Collectors.toList()); - final Set shardIdsOfClosedShards = leasesOfClosedShards.stream().map(Lease::leaseKey) - .collect(Collectors.toSet()); + final Set shardIdsOfClosedShards = leasesOfClosedShards.stream() + .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)).collect(Collectors.toSet()); if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) { assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards); Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( - shardIdToShardMap); + shardIdToShardMap, multiStreamArgs); leasesOfClosedShards.sort(startingSequenceNumberComparator); final Map trackedLeaseMap = trackedLeases.stream() - .collect(Collectors.toMap(Lease::leaseKey, Function.identity())); + .collect(Collectors.toMap(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs), Function.identity())); for (Lease leaseOfClosedShard : leasesOfClosedShards) { - final String closedShardId = leaseOfClosedShard.leaseKey(); + final String closedShardId = shardIdFromLeaseDeducer.apply(leaseOfClosedShard, multiStreamArgs); final Set childShardIds = shardIdToChildShardIdsMap.get(closedShardId); if (closedShardId != null && !CollectionUtils.isNullOrEmpty(childShardIds)) { - cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher); + cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher, multiStreamArgs); } } } @@ -637,7 +713,7 @@ public class HierarchicalShardSyncer { * @throws DependencyException */ synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set childShardIds, - final Map trackedLeases, final LeaseRefresher leaseRefresher) + final Map trackedLeases, final LeaseRefresher leaseRefresher, final MultiStreamArgs multiStreamArgs) throws DependencyException, InvalidStateException, ProvisionedThroughputException { final Lease leaseForClosedShard = trackedLeases.get(closedShardId); final List childShardLeases = childShardIds.stream().map(trackedLeases::get).filter(Objects::nonNull) @@ -655,7 +731,7 @@ public class HierarchicalShardSyncer { if (okayToDelete) { log.info("Deleting lease for shard {} as it has been completely processed and processing of child " - + "shards has begun.", leaseForClosedShard.leaseKey()); + + "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs)); leaseRefresher.deleteLease(leaseForClosedShard); } } @@ -684,6 +760,23 @@ public class HierarchicalShardSyncer { return newLease; } + private static Lease newKCLMultiStreamLease(final Shard shard, final String streamName) { + MultiStreamLease newLease = new MultiStreamLease(); + newLease.leaseKey(MultiStreamLease.getLeaseKey(streamName, shard.shardId())); + List parentShardIds = new ArrayList<>(2); + if (shard.parentShardId() != null) { + parentShardIds.add(shard.parentShardId()); + } + if (shard.adjacentParentShardId() != null) { + parentShardIds.add(shard.adjacentParentShardId()); + } + newLease.parentShardIds(parentShardIds); + newLease.ownerSwitchesSinceCheckpoint(0L); + newLease.streamName(streamName); + newLease.shardId(shard.shardId()); + return newLease; + } + /** * Helper method to construct a shardId->Shard map for the specified list of shards. * @@ -728,6 +821,7 @@ public class HierarchicalShardSyncer { private static final long serialVersionUID = 1L; private final Map shardIdToShardMap; + private final MultiStreamArgs multiStreamArgs; /** * Compares two leases based on the starting sequence number of corresponding shards. @@ -741,8 +835,8 @@ public class HierarchicalShardSyncer { @Override public int compare(final Lease lease1, final Lease lease2) { int result = 0; - final String shardId1 = lease1.leaseKey(); - final String shardId2 = lease2.leaseKey(); + final String shardId1 = shardIdFromLeaseDeducer.apply(lease1, multiStreamArgs); + final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs); final Shard shard1 = shardIdToShardMap.get(shardId1); final Shard shard2 = shardIdToShardMap.get(shardId2); @@ -762,4 +856,11 @@ public class HierarchicalShardSyncer { } + @Data + @Accessors(fluent = true) + private static class MultiStreamArgs { + private final Boolean isMultiStreamMode; + private final String streamName; + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index ba136f0a..f009a0e7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -59,7 +59,7 @@ public class KinesisShardDetector implements ShardDetector { @NonNull private final KinesisAsyncClient kinesisClient; - @NonNull + @NonNull @Getter private final String streamName; private final long listShardsBackoffTimeInMillis; private final int maxListShardsRetryAttempts; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 6504bb24..3361e3fb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -262,10 +262,24 @@ public class LeaseManagementConfig { */ private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK; - private HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer(); + private HierarchicalShardSyncer hierarchicalShardSyncer; private LeaseManagementFactory leaseManagementFactory; + private HierarchicalShardSyncer hierarchicalShardSyncer() { + if(hierarchicalShardSyncer == null) { + hierarchicalShardSyncer = new HierarchicalShardSyncer(); + } + return hierarchicalShardSyncer; + } + + public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) { + if(hierarchicalShardSyncer == null) { + hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode); + } + return hierarchicalShardSyncer; + } + @Deprecated public LeaseManagementFactory leaseManagementFactory() { if (leaseManagementFactory == null) { @@ -299,7 +313,7 @@ public class LeaseManagementConfig { return leaseManagementFactory; } - public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer) { + public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) { if(leaseManagementFactory == null) { leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), dynamoDBClient(), @@ -322,7 +336,7 @@ public class LeaseManagementConfig { cacheMissWarningModulus(), initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), - hierarchicalShardSyncer(), + hierarchicalShardSyncer(isMultiStreamingMode), tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index cf3a1a78..43e1e1b3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -27,4 +27,8 @@ public interface ShardDetector { List listShards(); + default String streamName() { + throw new UnsupportedOperationException("StreamName not available"); + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 2252da84..c0d3913b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -33,7 +33,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.leases.CompositeLeaseKey; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher;