Addressing comments

This commit is contained in:
Chunxue Yang 2019-10-15 14:50:52 -07:00
parent 800d24309d
commit 77e886aeda
6 changed files with 26 additions and 26 deletions

View file

@ -22,7 +22,7 @@
<parent> <parent>
<groupId>software.amazon.kinesis</groupId> <groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client-pom</artifactId> <artifactId>amazon-kinesis-client-pom</artifactId>
<version>2.2.5</version> <version>2.2.5-SNAPSHOT</version>
</parent> </parent>
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>

View file

@ -79,14 +79,14 @@ public class HierarchicalShardSyncer {
final MetricsScope scope) throws DependencyException, InvalidStateException, final MetricsScope scope) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException { ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> shards = getShardList(shardDetector); final List<Shard> shards = getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, scope); ignoreUnexpectedChildShards, scope, shards);
} }
//Provide a pre-collcted list of shards to avoid calling ListShards API //Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(List<Shard> shards, @NonNull final ShardDetector shardDetector, public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> shards)throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException { ProvisionedThroughputException, KinesisClientLibIOException {
if (!CollectionUtils.isNullOrEmpty(shards)) { if (!CollectionUtils.isNullOrEmpty(shards)) {
log.debug("Num shards: {}", shards.size()); log.debug("Num shards: {}", shards.size());
@ -102,8 +102,7 @@ public class HierarchicalShardSyncer {
final List<Lease> currentLeases = leaseRefresher.listLeases(); final List<Lease> currentLeases = leaseRefresher.listLeases();
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
inconsistentShardIds);
log.debug("Num new leases to create: {}", newLeasesToCreate.size()); log.debug("Num new leases to create: {}", newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) { for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
@ -115,13 +114,11 @@ public class HierarchicalShardSyncer {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
} }
} }
final List<Lease> trackedLeases = new ArrayList<>(currentLeases); final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate); trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher); cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher);
if (cleanupLeasesOfCompletedShards) { if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher);
leaseRefresher);
} }
} }

View file

@ -20,6 +20,7 @@ import com.sun.org.apache.bcel.internal.generic.LUSHR;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -101,16 +102,16 @@ public class ShutdownTask implements ConsumerTask {
List<Shard> allShards = new ArrayList<>(); List<Shard> allShards = new ArrayList<>();
/* /*
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows other * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active
* shard consumer to subscribe to this shard. * workers to contend for the lease of this shard.
*/ */
if (localReason == ShutdownReason.SHARD_END) { if (localReason == ShutdownReason.SHARD_END) {
allShards = shardDetector.listShards(); allShards = shardDetector.listShards();
if (!CollectionUtils.isNullOrEmpty(allShards) && !validateShardEnd(allShards)) { if (!CollectionUtils.isNullOrEmpty(allShards) && !validateShardEnd(allShards)) {
localReason = ShutdownReason.LEASE_LOST; localReason = ShutdownReason.LEASE_LOST;
forceLoseLease(); dropLease();
log.debug("Force the lease to be lost before shutting down the consumer."); log.info("Force the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId());
} }
} }
@ -152,8 +153,8 @@ public class ShutdownTask implements ConsumerTask {
if (localReason == ShutdownReason.SHARD_END) { if (localReason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId()); log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards // create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseCoordinator.leaseRefresher(), hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, allShards);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
} }
@ -205,16 +206,18 @@ public class ShutdownTask implements ConsumerTask {
} }
private boolean isChildShardOfCurrentShard(Shard shard) { private boolean isChildShardOfCurrentShard(Shard shard) {
return (shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId()) return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId())
|| shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())); || StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId()));
} }
private void forceLoseLease() { private void dropLease() {
Collection<Lease> leases = leaseCoordinator.getAssignments(); Collection<Lease> leases = leaseCoordinator.getAssignments();
if(leases != null && !leases.isEmpty()) { if(leases != null && !leases.isEmpty()) {
for(Lease lease : leases) { for(Lease lease : leases) {
if(lease.leaseKey().equals(shardInfo.shardId())) { if(lease.leaseKey().equals(shardInfo.shardId())) {
leaseCoordinator.dropLease(lease); leaseCoordinator.dropLease(lease);
log.warn("Dropped lease for shutting down ShardConsumer: " + lease.leaseKey());
break;
} }
} }
} }

View file

@ -215,8 +215,8 @@ public class HierarchicalShardSyncerTest {
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shards, shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE); cleanupLeasesOfCompletedShards, false, SCOPE, shards);
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -247,8 +247,8 @@ public class HierarchicalShardSyncerTest {
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(new ArrayList<Shard>(), shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE); cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList<Shard>());
final Set<String> expectedShardIds = new HashSet<>(); final Set<String> expectedShardIds = new HashSet<>();

View file

@ -132,9 +132,9 @@ public class ShutdownTaskTest {
doAnswer((invocation) -> { doAnswer((invocation) -> {
throw new KinesisClientLibIOException("KinesisClientLibIOException"); throw new KinesisClientLibIOException("KinesisClientLibIOException");
}).when(hierarchicalShardSyncer) }).when(hierarchicalShardSyncer)
.checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
NULL_METRICS_FACTORY.createMetrics()); NULL_METRICS_FACTORY.createMetrics(), shards);
final TaskResult result = task.call(); final TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());

View file

@ -22,7 +22,7 @@
<artifactId>amazon-kinesis-client-pom</artifactId> <artifactId>amazon-kinesis-client-pom</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>Amazon Kinesis Client Library</name> <name>Amazon Kinesis Client Library</name>
<version>2.2.5</version> <version>2.2.5-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>