diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index e404725c..bfbfd1cf 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -22,7 +22,7 @@
software.amazon.kinesis
amazon-kinesis-client-pom
- 2.2.5
+ 2.2.5-SNAPSHOT
amazon-kinesis-client
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
index 71fa0102..1f89f8c8 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
@@ -79,14 +79,14 @@ public class HierarchicalShardSyncer {
final MetricsScope scope) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
final List shards = getShardList(shardDetector);
- checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
- ignoreUnexpectedChildShards, scope);
+ checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards, scope, shards);
}
//Provide a pre-collcted list of shards to avoid calling ListShards API
- public synchronized void checkAndCreateLeaseForNewShards(List shards, @NonNull final ShardDetector shardDetector,
+ public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
- final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException,
+ final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List shards)throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
if (!CollectionUtils.isNullOrEmpty(shards)) {
log.debug("Num shards: {}", shards.size());
@@ -102,8 +102,7 @@ public class HierarchicalShardSyncer {
final List currentLeases = leaseRefresher.listLeases();
- final List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
- inconsistentShardIds);
+ final List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
@@ -115,13 +114,11 @@ public class HierarchicalShardSyncer {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
}
}
-
final List trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher);
if (cleanupLeasesOfCompletedShards) {
- cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
- leaseRefresher);
+ cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher);
}
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
index d8f30f2f..5785220e 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
@@ -20,6 +20,7 @@ import com.sun.org.apache.bcel.internal.generic.LUSHR;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@@ -101,16 +102,16 @@ public class ShutdownTask implements ConsumerTask {
List allShards = new ArrayList<>();
/*
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
- * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows other
- * shard consumer to subscribe to this shard.
+ * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active
+ * workers to contend for the lease of this shard.
*/
if (localReason == ShutdownReason.SHARD_END) {
allShards = shardDetector.listShards();
if (!CollectionUtils.isNullOrEmpty(allShards) && !validateShardEnd(allShards)) {
localReason = ShutdownReason.LEASE_LOST;
- forceLoseLease();
- log.debug("Force the lease to be lost before shutting down the consumer.");
+ dropLease();
+ log.info("Force the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId());
}
}
@@ -152,8 +153,8 @@ public class ShutdownTask implements ConsumerTask {
if (localReason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards
- hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseCoordinator.leaseRefresher(),
- initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
+ hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
+ initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, allShards);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
}
@@ -205,16 +206,18 @@ public class ShutdownTask implements ConsumerTask {
}
private boolean isChildShardOfCurrentShard(Shard shard) {
- return (shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId())
- || shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId()));
+ return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId())
+ || StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId()));
}
- private void forceLoseLease() {
+ private void dropLease() {
Collection leases = leaseCoordinator.getAssignments();
if(leases != null && !leases.isEmpty()) {
for(Lease lease : leases) {
if(lease.leaseKey().equals(shardInfo.shardId())) {
leaseCoordinator.dropLease(lease);
+ log.warn("Dropped lease for shutting down ShardConsumer: " + lease.leaseKey());
+ break;
}
}
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
index 232d17dc..ee6fa933 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
@@ -215,8 +215,8 @@ public class HierarchicalShardSyncerTest {
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
hierarchicalShardSyncer
- .checkAndCreateLeaseForNewShards(shards, shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
- cleanupLeasesOfCompletedShards, false, SCOPE);
+ .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
+ cleanupLeasesOfCompletedShards, false, SCOPE, shards);
final Set expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@@ -247,8 +247,8 @@ public class HierarchicalShardSyncerTest {
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
hierarchicalShardSyncer
- .checkAndCreateLeaseForNewShards(new ArrayList(), shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
- cleanupLeasesOfCompletedShards, false, SCOPE);
+ .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
+ cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList());
final Set expectedShardIds = new HashSet<>();
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
index 597c9ae8..cb3f42cb 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
@@ -132,9 +132,9 @@ public class ShutdownTaskTest {
doAnswer((invocation) -> {
throw new KinesisClientLibIOException("KinesisClientLibIOException");
}).when(hierarchicalShardSyncer)
- .checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
+ .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
- NULL_METRICS_FACTORY.createMetrics());
+ NULL_METRICS_FACTORY.createMetrics(), shards);
final TaskResult result = task.call();
assertNotNull(result.getException());
diff --git a/pom.xml b/pom.xml
index d967ab34..190e0c7a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
amazon-kinesis-client-pom
pom
Amazon Kinesis Client Library
- 2.2.5
+ 2.2.5-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.