Addressing comments

Renaming Method
This commit is contained in:
Chunxue Yang 2019-10-03 16:29:38 -07:00
parent 8847be997b
commit 407b4be8dd
2 changed files with 18 additions and 11 deletions

View file

@ -88,7 +88,7 @@ public class HierarchicalShardSyncer {
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException, final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException { ProvisionedThroughputException, KinesisClientLibIOException {
if(CollectionUtils.isNullOrEmpty(shards)) { if (CollectionUtils.isNullOrEmpty(shards)) {
shards = getShardList(shardDetector); shards = getShardList(shardDetector);
} }
log.debug("Num shards: {}", shards.size()); log.debug("Num shards: {}", shards.size());

View file

@ -21,6 +21,7 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -94,10 +95,15 @@ public class ShutdownTask implements ConsumerTask {
try { try {
try { try {
List<Shard> allShards = new ArrayList<>(); List<Shard> allShards = new ArrayList<>();
if(reason == ShutdownReason.SHARD_END) { /*
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows other
* shard consumer to subscribe to this shard.
*/
if (reason == ShutdownReason.SHARD_END) {
allShards = shardDetector.listShards(); allShards = shardDetector.listShards();
if(!isRealShardEnd(allShards)) { if (!CollectionUtils.isNullOrEmpty(allShards) && !shardEndValidated(allShards)) {
reason = ShutdownReason.LEASE_LOST; reason = ShutdownReason.LEASE_LOST;
} }
} }
@ -183,17 +189,18 @@ public class ShutdownTask implements ConsumerTask {
return reason; return reason;
} }
private boolean isRealShardEnd(List<Shard> shards) { private boolean shardEndValidated(List<Shard> shards) {
boolean realShardEnd = false;
for(Shard shard : shards) { for(Shard shard : shards) {
if(shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId()) if (isChildShard(shard)) {
|| shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())) { return true;
realShardEnd = true;
break;
} }
} }
return realShardEnd; return false;
}
private boolean isChildShard(Shard shard) {
return (shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId())
|| shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId()));
} }
} }