Fix for avoiding premature creation of child lease when other parent lease is not created yet. This can happen in the case of fast reshard operations.
This commit is contained in:
parent
5fc55e0c8c
commit
ff1bee5791
1 changed files with 21 additions and 0 deletions
|
|
@ -17,6 +17,7 @@ package software.amazon.kinesis.lifecycle;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
|
|
@ -237,6 +238,26 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
|
|
||||||
private void createLeasesForChildShardsIfNotExist()
|
private void createLeasesForChildShardsIfNotExist()
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
|
||||||
|
// not present in the lease table before creating the lease entry.
|
||||||
|
if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) {
|
||||||
|
final ChildShard childShard = childShards.get(0);
|
||||||
|
final List<String> parentLeaseKeys = childShard.parentShards().stream()
|
||||||
|
.map(parentShardId -> ShardInfo.getLeaseKey(shardInfo, parentShardId)).collect(Collectors.toList());
|
||||||
|
if (parentLeaseKeys.size() != 2) {
|
||||||
|
throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard
|
||||||
|
+ " does not contain other parent information.");
|
||||||
|
} else {
|
||||||
|
boolean isValidLeaseTableState =
|
||||||
|
Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects
|
||||||
|
.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1)));
|
||||||
|
if (!isValidLeaseTableState) {
|
||||||
|
throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard
|
||||||
|
+ " has partial parent information in lease table.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Attempt create leases for child shards.
|
||||||
for(ChildShard childShard : childShards) {
|
for(ChildShard childShard : childShards) {
|
||||||
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
|
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
|
||||||
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
|
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue