From 4b3c717c53517a564ba9adea7a658dff4c6cd65d Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Sun, 5 Apr 2020 13:36:25 -0700 Subject: [PATCH] Quick fix --- .../amazon/kinesis/leases/HierarchicalShardSyncer.java | 1 + .../java/software/amazon/kinesis/lifecycle/ShutdownTask.java | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 34c17bdf..387d9155 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -173,6 +173,7 @@ public class HierarchicalShardSyncer { } else { throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found."); } + newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); newLease.ownerSwitchesSinceCheckpoint(0L); return newLease; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 18a0af63..66ed7cbc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -71,8 +71,6 @@ public class ShutdownTask implements ConsumerTask { @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; - private final boolean garbageCollectLeases = false; - private final boolean isLeaseTableEmpty = false; private final boolean ignoreUnexpectedChildShards; @NonNull private final LeaseCoordinator leaseCoordinator; @@ -174,7 +172,7 @@ public class ShutdownTask implements ConsumerTask { private void createLeasesForChildShardsIfNotExist() throws DependencyException, InvalidStateException, ProvisionedThroughputException { for(ChildShard childShard : childShards) { - if(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId()) == null) { + if(leaseCoordinator.getCurrentlyHeldLease(childShard.shardId()) == null) { final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard); leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); }