getLeaseKey

This commit is contained in:
Chunxue Yang 2020-05-05 17:31:08 -07:00
parent 1036006bb4
commit 760c52d7bc
3 changed files with 8 additions and 6 deletions

View file

@ -149,9 +149,13 @@ public class ShardInfo {
* @return lease key * @return lease key
*/ */
public static String getLeaseKey(ShardInfo shardInfo) { public static String getLeaseKey(ShardInfo shardInfo) {
return getLeaseKey(shardInfo, shardInfo.shardId());
}
public static String getLeaseKey(ShardInfo shardInfo, String shardId) {
return shardInfo.streamIdentifierSerOpt().isPresent() ? return shardInfo.streamIdentifierSerOpt().isPresent() ?
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardInfo.shardId()) : MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardId) :
shardInfo.shardId(); shardId;
} }
} }

View file

@ -61,7 +61,7 @@ public class BlockOnParentShardTask implements ConsumerTask {
try { try {
boolean blockedOnParentShard = false; boolean blockedOnParentShard = false;
for (String shardId : shardInfo.parentShardIds()) { for (String shardId : shardInfo.parentShardIds()) {
final String leaseKey = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardId).orElse(shardId); final String leaseKey = ShardInfo.getLeaseKey(shardInfo, shardId);
final Lease lease = leaseRefresher.getLease(leaseKey); final Lease lease = leaseRefresher.getLease(leaseKey);
if (lease != null) { if (lease != null) {
ExtendedSequenceNumber checkpoint = lease.checkpoint(); ExtendedSequenceNumber checkpoint = lease.checkpoint();

View file

@ -183,9 +183,7 @@ public class ShutdownTask implements ConsumerTask {
private void createLeasesForChildShardsIfNotExist() private void createLeasesForChildShardsIfNotExist()
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
for(ChildShard childShard : childShards) { for(ChildShard childShard : childShards) {
final String leaseKey = shardInfo.streamIdentifierSerOpt() final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
.map(s -> s + ":" + childShard.shardId())
.orElse(childShard.shardId());
if(leaseCoordinator.getCurrentlyHeldLease(leaseKey) == null) { if(leaseCoordinator.getCurrentlyHeldLease(leaseKey) == null) {
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);