From 77e886aedab9497ac7b2cd981e2931d248ccfa78 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 15 Oct 2019 14:50:52 -0700 Subject: [PATCH] Addressing comments --- amazon-kinesis-client/pom.xml | 2 +- .../leases/HierarchicalShardSyncer.java | 15 ++++++------- .../kinesis/lifecycle/ShutdownTask.java | 21 +++++++++++-------- .../leases/HierarchicalShardSyncerTest.java | 8 +++---- .../kinesis/lifecycle/ShutdownTaskTest.java | 4 ++-- pom.xml | 2 +- 6 files changed, 26 insertions(+), 26 deletions(-) diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index e404725c..bfbfd1cf 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -22,7 +22,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 2.2.5 + 2.2.5-SNAPSHOT amazon-kinesis-client diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 71fa0102..1f89f8c8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -79,14 +79,14 @@ public class HierarchicalShardSyncer { final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { final List shards = getShardList(shardDetector); - checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, scope); + checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, scope, shards); } //Provide a pre-collcted list of shards to avoid calling ListShards API - public synchronized void checkAndCreateLeaseForNewShards(List shards, @NonNull final ShardDetector shardDetector, + public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException, + final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List shards)throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { if (!CollectionUtils.isNullOrEmpty(shards)) { log.debug("Num shards: {}", shards.size()); @@ -102,8 +102,7 @@ public class HierarchicalShardSyncer { final List currentLeases = leaseRefresher.listLeases(); - final List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, - inconsistentShardIds); + final List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); log.debug("Num new leases to create: {}", newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); @@ -115,13 +114,11 @@ public class HierarchicalShardSyncer { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); } } - final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher); if (cleanupLeasesOfCompletedShards) { - cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, - leaseRefresher); + cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index d8f30f2f..5785220e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -20,6 +20,7 @@ import com.sun.org.apache.bcel.internal.generic.LUSHR; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -101,16 +102,16 @@ public class ShutdownTask implements ConsumerTask { List allShards = new ArrayList<>(); /* * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END - * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows other - * shard consumer to subscribe to this shard. + * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active + * workers to contend for the lease of this shard. */ if (localReason == ShutdownReason.SHARD_END) { allShards = shardDetector.listShards(); if (!CollectionUtils.isNullOrEmpty(allShards) && !validateShardEnd(allShards)) { localReason = ShutdownReason.LEASE_LOST; - forceLoseLease(); - log.debug("Force the lease to be lost before shutting down the consumer."); + dropLease(); + log.info("Force the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId()); } } @@ -152,8 +153,8 @@ public class ShutdownTask implements ConsumerTask { if (localReason == ShutdownReason.SHARD_END) { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseCoordinator.leaseRefresher(), - initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), + initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, allShards); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } @@ -205,16 +206,18 @@ public class ShutdownTask implements ConsumerTask { } private boolean isChildShardOfCurrentShard(Shard shard) { - return (shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId()) - || shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())); + return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId()) + || StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId())); } - private void forceLoseLease() { + private void dropLease() { Collection leases = leaseCoordinator.getAssignments(); if(leases != null && !leases.isEmpty()) { for(Lease lease : leases) { if(lease.leaseKey().equals(shardInfo.shardId())) { leaseCoordinator.dropLease(lease); + log.warn("Dropped lease for shutting down ShardConsumer: " + lease.leaseKey()); + break; } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 232d17dc..ee6fa933 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -215,8 +215,8 @@ public class HierarchicalShardSyncerTest { when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(shards, shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false, SCOPE); + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE, shards); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); @@ -247,8 +247,8 @@ public class HierarchicalShardSyncerTest { when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(new ArrayList(), shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false, SCOPE); + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList()); final Set expectedShardIds = new HashSet<>(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 597c9ae8..cb3f42cb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -132,9 +132,9 @@ public class ShutdownTaskTest { doAnswer((invocation) -> { throw new KinesisClientLibIOException("KinesisClientLibIOException"); }).when(hierarchicalShardSyncer) - .checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, + .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - NULL_METRICS_FACTORY.createMetrics()); + NULL_METRICS_FACTORY.createMetrics(), shards); final TaskResult result = task.call(); assertNotNull(result.getException()); diff --git a/pom.xml b/pom.xml index d967ab34..190e0c7a 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 2.2.5 + 2.2.5-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.