Update lease sync algorithm to create leases for no more than one level of the shard hierarchy (#41)
* Update NonEmptyLeaseTableSynchronizer algorithm to create leases for no more than one level of the shard hierarchy * Add and fix unit tests for LATEST, TRIM_HORIZON, AT_TIMESTAMP * Update unit tests * Fix edge case of reading from TRIM with partial lease for one child shard * Revert child shards optimization for TRIM/TIMESTAMP case
This commit is contained in:
parent
f2ba3bcd2f
commit
113029e33c
2 changed files with 1239 additions and 376 deletions
|
|
@ -18,9 +18,11 @@ import java.io.Serializable;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
@ -33,6 +35,7 @@ import java.util.stream.Collectors;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
|
@ -446,7 +449,7 @@ public class HierarchicalShardSyncer {
|
||||||
/**
|
/**
|
||||||
* Note: Package level access for testing purposes only.
|
* Note: Package level access for testing purposes only.
|
||||||
* Check if this shard is a descendant of a shard that is (or will be) processed.
|
* Check if this shard is a descendant of a shard that is (or will be) processed.
|
||||||
* Create leases for the ancestors of this shard as required.
|
* Create leases for the first ancestor of this shard that needs to be processed, as required.
|
||||||
* See javadoc of determineNewLeasesToCreate() for rules and example.
|
* See javadoc of determineNewLeasesToCreate() for rules and example.
|
||||||
*
|
*
|
||||||
* @param shardId The shardId to check.
|
* @param shardId The shardId to check.
|
||||||
|
|
@ -462,10 +465,10 @@ public class HierarchicalShardSyncer {
|
||||||
static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId,
|
static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId,
|
||||||
final InitialPositionInStreamExtended initialPosition, final Set<String> shardIdsOfCurrentLeases,
|
final InitialPositionInStreamExtended initialPosition, final Set<String> shardIdsOfCurrentLeases,
|
||||||
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
|
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
|
||||||
final Map<String, Lease> shardIdToLeaseMapOfNewShards, final Map<String, Boolean> memoizationContext,
|
final Map<String, Lease> shardIdToLeaseMapOfNewShards, final MemoizationContext memoizationContext,
|
||||||
final MultiStreamArgs multiStreamArgs) {
|
final MultiStreamArgs multiStreamArgs) {
|
||||||
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
|
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
|
||||||
final Boolean previousValue = memoizationContext.get(shardId);
|
final Boolean previousValue = memoizationContext.isDescendant(shardId);
|
||||||
if (previousValue != null) {
|
if (previousValue != null) {
|
||||||
return previousValue;
|
return previousValue;
|
||||||
}
|
}
|
||||||
|
|
@ -480,13 +483,17 @@ public class HierarchicalShardSyncer {
|
||||||
// We don't need to add leases of its ancestors,
|
// We don't need to add leases of its ancestors,
|
||||||
// because we'd have done it when creating a lease for this shard.
|
// because we'd have done it when creating a lease for this shard.
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
final Shard shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
|
final Shard shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
|
||||||
final Set<String> parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards);
|
final Set<String> parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards);
|
||||||
for (String parentShardId : parentShardIds) {
|
for (String parentShardId : parentShardIds) {
|
||||||
// Check if the parent is a descendant, and include its ancestors.
|
// Check if the parent is a descendant, and include its ancestors. Or, if the parent is NOT a
|
||||||
if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, initialPosition,
|
// descendant but we should create a lease for it anyway (e.g. to include in processing from
|
||||||
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards,
|
// TRIM_HORIZON or AT_TIMESTAMP). If either is true, then we mark the current shard as a descendant.
|
||||||
memoizationContext, multiStreamArgs)) {
|
final boolean isParentDescendant = checkIfDescendantAndAddNewLeasesForAncestors(parentShardId,
|
||||||
|
initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards,
|
||||||
|
shardIdToLeaseMapOfNewShards, memoizationContext, multiStreamArgs);
|
||||||
|
if (isParentDescendant || memoizationContext.shouldCreateLease(parentShardId)) {
|
||||||
isDescendant = true;
|
isDescendant = true;
|
||||||
descendantParentShardIds.add(parentShardId);
|
descendantParentShardIds.add(parentShardId);
|
||||||
log.debug("{} : Parent shard {} is a descendant.", streamIdentifier, parentShardId);
|
log.debug("{} : Parent shard {} is a descendant.", streamIdentifier, parentShardId);
|
||||||
|
|
@ -499,48 +506,87 @@ public class HierarchicalShardSyncer {
|
||||||
if (isDescendant) {
|
if (isDescendant) {
|
||||||
for (String parentShardId : parentShardIds) {
|
for (String parentShardId : parentShardIds) {
|
||||||
if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
|
if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
|
||||||
log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId);
|
|
||||||
Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
|
Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the lease for the parent shard does not already exist, there are two cases in which we
|
||||||
|
* would want to create it:
|
||||||
|
* - If we have already marked the parentShardId for lease creation in a prior recursive
|
||||||
|
* call. This could happen if we are trying to process from TRIM_HORIZON or AT_TIMESTAMP.
|
||||||
|
* - If the parent shard is not a descendant but the current shard is a descendant, then
|
||||||
|
* the parent shard is the oldest shard in the shard hierarchy that does not have an
|
||||||
|
* ancestor in the lease table (the adjacent parent is necessarily a descendant, and
|
||||||
|
* therefore covered in the lease table). So we should create a lease for the parent.
|
||||||
|
*/
|
||||||
if (lease == null) {
|
if (lease == null) {
|
||||||
lease = multiStreamArgs.isMultiStreamMode() ?
|
if (memoizationContext.shouldCreateLease(parentShardId) ||
|
||||||
newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId),
|
!descendantParentShardIds.contains(parentShardId)) {
|
||||||
multiStreamArgs.streamIdentifier()) :
|
log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId);
|
||||||
newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
|
lease = multiStreamArgs.isMultiStreamMode() ?
|
||||||
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
|
newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId),
|
||||||
|
multiStreamArgs.streamIdentifier()) :
|
||||||
|
newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
|
||||||
|
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (descendantParentShardIds.contains(parentShardId)
|
/**
|
||||||
&& !initialPosition.getInitialPositionInStream()
|
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
|
||||||
|
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will
|
||||||
|
* add a lease just like we do for TRIM_HORIZON. However we will only return back records
|
||||||
|
* with server-side timestamp at or after the specified initial position timestamp.
|
||||||
|
*
|
||||||
|
* Shard structure (each level depicts a stream segment):
|
||||||
|
* 0 1 2 3 4 5 - shards till epoch 102
|
||||||
|
* \ / \ / | |
|
||||||
|
* 6 7 4 5 - shards from epoch 103 - 205
|
||||||
|
* \ / | /\
|
||||||
|
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
||||||
|
*
|
||||||
|
* Current leases: (4, 5, 7)
|
||||||
|
*
|
||||||
|
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
|
||||||
|
* timestamp value 206. We will then create new leases for all the shards 0 and 1 (with
|
||||||
|
* checkpoint set AT_TIMESTAMP), even though these ancestor shards have an epoch less than
|
||||||
|
* 206. However as we begin processing the ancestor shards, their checkpoints would be
|
||||||
|
* updated to SHARD_END and their leases would then be deleted since they won't have records
|
||||||
|
* with server-side timestamp at/after 206. And after that we will begin processing the
|
||||||
|
* descendant shards with epoch at/after 206 and we will return the records that meet the
|
||||||
|
* timestamp requirement for these shards.
|
||||||
|
*/
|
||||||
|
if (lease != null) {
|
||||||
|
if (descendantParentShardIds.contains(parentShardId)
|
||||||
|
&& !initialPosition.getInitialPositionInStream()
|
||||||
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
|
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
|
||||||
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||||
} else {
|
} else {
|
||||||
lease.checkpoint(convertToCheckpoint(initialPosition));
|
lease.checkpoint(convertToCheckpoint(initialPosition));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// This shard should be included, if the customer wants to process all records in the stream or
|
// This shard is not a descendant, but should still be included if the customer wants to process all
|
||||||
// if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do
|
// records in the stream or if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a
|
||||||
// for TRIM_HORIZON. However we will only return back records with server-side timestamp at or
|
// lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
|
||||||
// after the specified initial position timestamp.
|
// timestamp at or after the specified initial position timestamp.
|
||||||
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|
||||||
|| initialPosition.getInitialPositionInStream()
|
|| initialPosition.getInitialPositionInStream()
|
||||||
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
|
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
|
||||||
isDescendant = true;
|
memoizationContext.setShouldCreateLease(shardId, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
memoizationContext.put(shardId, isDescendant);
|
memoizationContext.setIsDescendant(shardId, isDescendant);
|
||||||
return isDescendant;
|
return isDescendant;
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId,
|
static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId,
|
||||||
final InitialPositionInStreamExtended initialPosition, final Set<String> shardIdsOfCurrentLeases,
|
final InitialPositionInStreamExtended initialPosition, final Set<String> shardIdsOfCurrentLeases,
|
||||||
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
|
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
|
||||||
final Map<String, Lease> shardIdToLeaseMapOfNewShards, final Map<String, Boolean> memoizationContext) {
|
final Map<String, Lease> shardIdToLeaseMapOfNewShards, MemoizationContext memoizationContext) {
|
||||||
return checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, shardIdsOfCurrentLeases,
|
return checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, shardIdsOfCurrentLeases,
|
||||||
shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, memoizationContext,
|
shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, memoizationContext,
|
||||||
new MultiStreamArgs(false, null));
|
new MultiStreamArgs(false, null));
|
||||||
|
|
@ -1033,8 +1079,10 @@ public class HierarchicalShardSyncer {
|
||||||
* Note: Package level access only for testing purposes.
|
* Note: Package level access only for testing purposes.
|
||||||
* <p>
|
* <p>
|
||||||
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
|
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
|
||||||
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
|
* determine if it is a descendant of any shard which is or will be processed (e.g. for which a lease exists):
|
||||||
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
|
* If so, create a lease for the first ancestor that needs to be processed (if needed). We will create leases
|
||||||
|
* for no more than one level in the ancestry tree. Once we find the first ancestor that needs to be processed,
|
||||||
|
* we will avoid creating leases for further descendants of that ancestor.
|
||||||
* If not, set checkpoint of the shard to the initial position specified by the client.
|
* If not, set checkpoint of the shard to the initial position specified by the client.
|
||||||
* To check if we need to create leases for ancestors, we use the following rules:
|
* To check if we need to create leases for ancestors, we use the following rules:
|
||||||
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
|
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
|
||||||
|
|
@ -1052,11 +1100,19 @@ public class HierarchicalShardSyncer {
|
||||||
* Shard structure (each level depicts a stream segment):
|
* Shard structure (each level depicts a stream segment):
|
||||||
* 0 1 2 3 4 5 - shards till epoch 102
|
* 0 1 2 3 4 5 - shards till epoch 102
|
||||||
* \ / \ / | |
|
* \ / \ / | |
|
||||||
* 6 7 4 5 - shards from epoch 103 - 205
|
* 6 7 4 5 - shards from epoch 103 - 205
|
||||||
* \ / | / \
|
* \ / | / \
|
||||||
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
||||||
* Current leases: (3, 4, 5)
|
*
|
||||||
* New leases to create: (2, 6, 7, 8, 9, 10)
|
* Current leases: (4, 5, 7)
|
||||||
|
*
|
||||||
|
* If initial position is LATEST:
|
||||||
|
* - New leases to create: (6)
|
||||||
|
* If initial position is TRIM_HORIZON:
|
||||||
|
* - New leases to create: (0, 1)
|
||||||
|
* If initial position is AT_TIMESTAMP(epoch=200):
|
||||||
|
* - New leases to create: (0, 1)
|
||||||
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* The leases returned are sorted by the starting sequence number - following the same order
|
* The leases returned are sorted by the starting sequence number - following the same order
|
||||||
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
|
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
|
||||||
|
|
@ -1083,7 +1139,7 @@ public class HierarchicalShardSyncer {
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
final List<Shard> openShards = getOpenShards(shards, streamIdentifier);
|
final List<Shard> openShards = getOpenShards(shards, streamIdentifier);
|
||||||
final Map<String, Boolean> memoizationContext = new HashMap<>();
|
final MemoizationContext memoizationContext = new MemoizationContext();
|
||||||
|
|
||||||
// Iterate over the open shards and find those that don't have any lease entries.
|
// Iterate over the open shards and find those that don't have any lease entries.
|
||||||
for (Shard shard : openShards) {
|
for (Shard shard : openShards) {
|
||||||
|
|
@ -1094,45 +1150,32 @@ public class HierarchicalShardSyncer {
|
||||||
} else if (inconsistentShardIds.contains(shardId)) {
|
} else if (inconsistentShardIds.contains(shardId)) {
|
||||||
log.info("{} : shardId {} is an inconsistent child. Not creating a lease", streamIdentifier, shardId);
|
log.info("{} : shardId {} is an inconsistent child. Not creating a lease", streamIdentifier, shardId);
|
||||||
} else {
|
} else {
|
||||||
log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, shardId);
|
log.debug("{} : Beginning traversal of ancestry tree for shardId {}", streamIdentifier, shardId);
|
||||||
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
|
|
||||||
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
|
// A shard is a descendant if at least one if its ancestors exists in the lease table.
|
||||||
newKCLLease(shard);
|
// We will create leases for only one level in the ancestry tree. Once we find the first ancestor
|
||||||
|
// that needs to be processed in order to complete the hash range, we will not create leases for
|
||||||
|
// further descendants of that ancestor.
|
||||||
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
|
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
|
||||||
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
|
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
|
||||||
memoizationContext, multiStreamArgs);
|
memoizationContext, multiStreamArgs);
|
||||||
|
|
||||||
/**
|
// If shard is a descendant, the leases for its ancestors were already created above. Open shards
|
||||||
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
|
// that are NOT descendants will not have leases yet, so we create them here. We will not create
|
||||||
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
|
// leases for open shards that ARE descendants yet - leases for these shards will be created upon
|
||||||
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
|
// SHARD_END of their parents.
|
||||||
* timestamp at or after the specified initial position timestamp.
|
if (!isDescendant) {
|
||||||
*
|
log.debug("{} : shardId {} has no ancestors. Creating a lease.", streamIdentifier, shardId);
|
||||||
* Shard structure (each level depicts a stream segment):
|
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
|
||||||
* 0 1 2 3 4 5 - shards till epoch 102
|
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
|
||||||
* \ / \ / | |
|
newKCLLease(shard);
|
||||||
* 6 7 4 5 - shards from epoch 103 - 205
|
|
||||||
* \ / | /\
|
|
||||||
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
|
||||||
*
|
|
||||||
* Current leases: empty set
|
|
||||||
*
|
|
||||||
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
|
|
||||||
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
|
|
||||||
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
|
|
||||||
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
|
|
||||||
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
|
|
||||||
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
|
|
||||||
* return the records that meet the timestamp requirement for these shards.
|
|
||||||
*/
|
|
||||||
if (isDescendant
|
|
||||||
&& !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
|
|
||||||
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
|
||||||
} else {
|
|
||||||
newLease.checkpoint(convertToCheckpoint(initialPosition));
|
newLease.checkpoint(convertToCheckpoint(initialPosition));
|
||||||
|
log.debug("{} : Set checkpoint of {} to {}", streamIdentifier, newLease.leaseKey(), newLease.checkpoint());
|
||||||
|
shardIdToNewLeaseMap.put(shardId, newLease);
|
||||||
|
} else {
|
||||||
|
log.debug("{} : shardId {} is a descendant whose ancestors should already have leases. " +
|
||||||
|
"Not creating a lease.", streamIdentifier, shardId);
|
||||||
}
|
}
|
||||||
log.debug("{} : Set checkpoint of {} to {}", streamIdentifier, newLease.leaseKey(), newLease.checkpoint());
|
|
||||||
shardIdToNewLeaseMap.put(shardId, newLease);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1143,4 +1186,29 @@ public class HierarchicalShardSyncer {
|
||||||
return newLeasesToCreate;
|
return newLeasesToCreate;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to pass around state between recursive traversals of shard hierarchy.
|
||||||
|
*/
|
||||||
|
@NoArgsConstructor
|
||||||
|
static class MemoizationContext {
|
||||||
|
private Map<String, Boolean> isDescendantMap = new HashMap<>();
|
||||||
|
private Map<String, Boolean> shouldCreateLeaseMap = new HashMap<>();
|
||||||
|
|
||||||
|
Boolean isDescendant(String shardId) {
|
||||||
|
return isDescendantMap.get(shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setIsDescendant(String shardId, Boolean isDescendant) {
|
||||||
|
isDescendantMap.put(shardId, isDescendant);
|
||||||
|
}
|
||||||
|
|
||||||
|
Boolean shouldCreateLease(String shardId) {
|
||||||
|
return shouldCreateLeaseMap.computeIfAbsent(shardId, x -> Boolean.FALSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setShouldCreateLease(String shardId, Boolean shouldCreateLease) {
|
||||||
|
shouldCreateLeaseMap.put(shardId, shouldCreateLease);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue