diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 0c01f13f..d0d9c0c4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; import java.util.List; +import java.util.Objects; import java.util.Optional; import lombok.NonNull; @@ -237,6 +238,26 @@ public class ShutdownTask implements ConsumerTask { private void createLeasesForChildShardsIfNotExist() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + // For child shard resulted from merge of two parent shards, verify if both the parents are either present or + // not present in the lease table before creating the lease entry. + if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) { + final ChildShard childShard = childShards.get(0); + final List parentLeaseKeys = childShard.parentShards().stream() + .map(parentShardId -> ShardInfo.getLeaseKey(shardInfo, parentShardId)).collect(Collectors.toList()); + if (parentLeaseKeys.size() != 2) { + throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard + + " does not contain other parent information."); + } else { + boolean isValidLeaseTableState = + Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects + .isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1))); + if (!isValidLeaseTableState) { + throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard + + " has partial parent information in lease table."); + } + } + } + // Attempt create leases for child shards. for(ChildShard childShard : childShards) { final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId()); if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {