From 9cb5020022169cc368f4fcd1a56b5379d0fd25c6 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 22 Jan 2021 21:51:03 -0800 Subject: [PATCH 1/2] 1. Fix for updating HashRange directly while creating leases and including HashRange in seralizing and deserializing to/from DDB 2. Fix for making LeaseCleanupManager non-singleton to avoid cross-table interference in multiple apps running in same JVM 3. Fixing updateMetaInfo method to not update other lease table fields 4. Preventing shard deletion in LeaseCleanupManager if a valid shard does not have child shards in lease table and in Kinesis Service 5. Adding childshards update support in updateMetaInfo 6. Fixing LeaseCleanupManager to call updateMetaInfo instead of update for childshard update in lease 7. Fixing unit tests to accommodate HashRange changes --- .../lib/worker/KinesisShardSyncer.java | 5 ++- .../lib/worker/ShardConsumer.java | 2 +- .../clientlibrary/lib/worker/Worker.java | 8 ++-- .../impl/KinesisClientLeaseSerializer.java | 21 ++++++++- .../leases/impl/LeaseCleanupManager.java | 29 ++++++------ .../kinesis/leases/impl/LeaseManager.java | 1 - .../lib/worker/ShardSyncerTest.java | 44 +++++++++++-------- .../clientlibrary/lib/worker/WorkerTest.java | 4 ++ .../proxies/KinesisLocalFileProxy.java | 3 ++ .../util/KinesisLocalFileDataCreator.java | 1 + 10 files changed, 77 insertions(+), 41 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index 4235bad9..af2b7c36 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -45,6 +45,8 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.model.Shard; +import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange; + /** * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). @@ -617,7 +619,7 @@ class KinesisShardSyncer implements ShardSyncer { } newLease.setParentShardIds(parentShardIds); newLease.setOwnerSwitchesSinceCheckpoint(0L); - + newLease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange())); return newLease; } @@ -641,6 +643,7 @@ class KinesisShardSyncer implements ShardSyncer { newLease.setParentShardIds(parentShardIds); newLease.setOwnerSwitchesSinceCheckpoint(0L); newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + newLease.setHashKeyRange(fromHashKeyRange(childShard.getHashKeyRange())); return newLease; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 29a95ac4..9e73aad9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -243,7 +243,7 @@ class ShardConsumer { this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds, - maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(), config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords())); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index f8c66181..e7e725f8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -96,9 +96,9 @@ public class Worker implements Runnable { // Default configs for periodic shard sync private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0; private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL. - static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; - static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; - static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; + static long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; + static long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; + static long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator(); @@ -576,7 +576,7 @@ public class Worker implements Runnable { this.workerStateChangeListener = workerStateChangeListener; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager); - this.leaseCleanupManager = LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + this.leaseCleanupManager = LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 310edb67..5a7fb9b2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -68,6 +68,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer implements ILeaseManager { request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey())); Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); - updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); request.setAttributeUpdates(updates); try { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 9e9870d3..731cfdd3 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -93,6 +93,8 @@ public class ShardSyncerTest { private LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator(); private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator); + private static final HashKeyRange hashKeyRange = new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10"); + /** * Old/Obsolete max value of a sequence number (2^128 -1). */ @@ -154,10 +156,10 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); @@ -183,16 +185,16 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); String shardId2 = "shardId-2"; - shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange, hashKeyRange)); String shardIdWithLease = "shardId-3"; - shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange, hashKeyRange)); currentLeases.add(newLease(shardIdWithLease)); @@ -699,9 +701,9 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); @@ -731,10 +733,10 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); Set initialPositions = new HashSet(); initialPositions.add(INITIAL_POSITION_LATEST); @@ -769,17 +771,20 @@ public class ShardSyncerTest { shardsWithoutLeases.add(ShardObjectHelper.newShard("shardId-0", null, null, - ShardObjectHelper.newSequenceNumberRange("303", "404"))); + ShardObjectHelper.newSequenceNumberRange("303", "404"), + hashKeyRange)); final String lastShardId = "shardId-1"; shardsWithoutLeases.add(ShardObjectHelper.newShard(lastShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("405", null))); + ShardObjectHelper.newSequenceNumberRange("405", null), + hashKeyRange)); shardsWithLeases.add(ShardObjectHelper.newShard("shardId-2", null, null, - ShardObjectHelper.newSequenceNumberRange("202", "302"))); + ShardObjectHelper.newSequenceNumberRange("202", "302"), + hashKeyRange)); currentLeases.add(newLease("shardId-2")); final List allShards = @@ -805,12 +810,14 @@ public class ShardSyncerTest { shards.add(ShardObjectHelper.newShard(firstShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("303", "404"))); + ShardObjectHelper.newSequenceNumberRange("303", "404"), + hashKeyRange)); final String lastShardId = "shardId-1"; shards.add(ShardObjectHelper.newShard(lastShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("405", null))); + ShardObjectHelper.newSequenceNumberRange("405", null), + hashKeyRange)); final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); @@ -1969,14 +1976,14 @@ public class ShardSyncerTest { Map kinesisShards = new HashMap(); String parentShardId = "shardId-parent"; - kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null)); + kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null, hashKeyRange)); shardIdsOfCurrentLeases.add(parentShardId); String adjacentParentShardId = "shardId-adjacentParent"; - kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null)); + kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null, hashKeyRange)); String shardId = "shardId-9-1"; - Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null); + Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null, hashKeyRange); kinesisShards.put(shardId, shard); final MemoizationContext memoizationContext = new MemoizationContext(); @@ -2097,6 +2104,7 @@ public class ShardSyncerTest { String adjacentParentShardId = "shardId-adjacentParent"; shard.setParentShardId(parentShardId); shard.setAdjacentParentShardId(adjacentParentShardId); + shard.setHashKeyRange(hashKeyRange); KinesisClientLease lease = shardSyncer.newKCLLease(shard); Assert.assertEquals(shardId, lease.getLeaseKey()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index a7ca1151..50a3aa9b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -204,6 +204,10 @@ public class WorkerTest { when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(leaseCoordinator.getLeaseManager()).thenReturn(mock(ILeaseManager.class)); when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy); + Worker.MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 10; + Worker.MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 50; + Worker.LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 10; + } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index fe922520..1e3ff6de 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import com.amazonaws.services.kinesis.model.ChildShard; +import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -408,11 +409,13 @@ public class KinesisLocalFileProxy implements IKinesisProxy { ChildShard leftChild = new ChildShard(); leftChild.setShardId("shardId-1"); leftChild.setParentShards(parentShards); + leftChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10")); childShards.add(leftChild); ChildShard rightChild = new ChildShard(); rightChild.setShardId("shardId-2"); rightChild.setParentShards(parentShards); + rightChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("11").withEndingHashKey(MAX_HASHKEY_VALUE.toString())); childShards.add(rightChild); return childShards; } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java index b13d0233..2baac1ca 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java @@ -163,6 +163,7 @@ public class KinesisLocalFileDataCreator { HashKeyRange hashKeyRange = new HashKeyRange(); hashKeyRange.setStartingHashKey(hashKeyRangeStart.toString()); hashKeyRange.setEndingHashKey(hashKeyRangeEnd.toString()); + shard.setHashKeyRange(hashKeyRange); shards.add(shard); } From 76234d172c92ac46591611cdb7b9b1d3b0d4465e Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Sun, 24 Jan 2021 16:10:25 -0800 Subject: [PATCH 2/2] Making LeaseCleanupManager more verbose about lease cleanup failures --- .../leases/impl/LeaseCleanupManager.java | 61 ++++++++++++------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java index 49a0ca36..f6ff8ee0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -176,6 +176,7 @@ public class LeaseCleanupManager { boolean alreadyCheckedForGarbageCollection = false; boolean wereChildShardsPresent = false; boolean wasResourceNotFound = false; + String cleanupFailureReason = ""; try { if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { @@ -184,53 +185,55 @@ public class LeaseCleanupManager { Set childShardKeys = leaseFromDDB.getChildShardIds(); if (CollectionUtils.isNullOrEmpty(childShardKeys)) { try { + // throws ResourceNotFoundException childShardKeys = getChildShardsFromService(shardInfo); - if (CollectionUtils.isNullOrEmpty(childShardKeys)) { LOG.error("No child shards returned from service for shard " + shardInfo.getShardId()); // If no children shard is found in DDB and from service, then do not delete the lease throw new InvalidStateException("No child shards found for this supposedly " + "closed shard in both local DDB and in service " + shardInfo.getShardId()); - } else { wereChildShardsPresent = true; updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); } - } catch (ResourceNotFoundException e) { - throw e; } finally { + // We rely on resource presence in service for garbage collection. Since we already + // made a call to getChildShardsFromService we would be coming to know if the resource + // is present of not. In latter case, we would throw ResourceNotFoundException, which is + // handled in catch block. alreadyCheckedForGarbageCollection = true; } } else { wereChildShardsPresent = true; } try { - cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, childShardKeys); + final CompletedShardResult completedShardResult = cleanupLeaseForCompletedShard(lease, childShardKeys); + cleanedUpCompletedLease = completedShardResult.cleanedUp(); + cleanupFailureReason = completedShardResult.failureMsg(); } catch (Exception e) { // Suppressing the exception here, so that we can attempt for garbage cleanup. - LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId()); + LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " due to " + e.getMessage()); } } else { LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId()); cleanedUpCompletedLease = true; } } - - if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { - try { - wereChildShardsPresent = !CollectionUtils + if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { + // throws ResourceNotFoundException + wereChildShardsPresent = !CollectionUtils .isNullOrEmpty(getChildShardsFromService(shardInfo)); - } catch (ResourceNotFoundException e) { - throw e; - } } } catch (ResourceNotFoundException e) { wasResourceNotFound = true; cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease); + cleanupFailureReason = cleanedUpGarbageLease ? "" : "DDB Lease Deletion Failed"; + } catch (Exception e) { + LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " : " + e.getMessage()); + cleanupFailureReason = e.getMessage(); } - return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent, - wasResourceNotFound); + wasResourceNotFound, cleanupFailureReason); } private Set getChildShardsFromService(ShardInfo shardInfo) { @@ -238,12 +241,16 @@ public class LeaseCleanupManager { return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet()); } - // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the // stream (known explicitly from ResourceNotFound being thrown when processing this shard), private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException { LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream."); - leaseManager.deleteLease(lease); + try { + leaseManager.deleteLease(lease); + } catch (Exception e) { + LOG.warn("Lease deletion failed for " + lease.getLeaseKey() + " due to " + e.getMessage()); + return false; + } return true; } @@ -263,8 +270,9 @@ public class LeaseCleanupManager { // We should only be deleting the current shard's lease if // 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP. // 2. Its parent shard lease(s) have already been deleted. - private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set childShardLeaseKeys) + private CompletedShardResult cleanupLeaseForCompletedShard(KinesisClientLease lease, Set childShardLeaseKeys) throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException { + final Set processedChildShardLeaseKeys = new HashSet<>(); for (String childShardLeaseKey : childShardLeaseKeys) { @@ -280,14 +288,17 @@ public class LeaseCleanupManager { } } - if (!allParentShardLeasesDeleted(lease) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) { - return false; + boolean parentShardsDeleted = allParentShardLeasesDeleted(lease); + boolean childrenStartedProcessing = Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys); + + if (!parentShardsDeleted || !childrenStartedProcessing) { + return new CompletedShardResult(false, !parentShardsDeleted ? "Parent shard(s) not deleted yet" : "Child shard(s) yet to begin processing"); } LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun."); leaseManager.deleteLease(lease); - return true; + return new CompletedShardResult(true, ""); } private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set childShardKeys) @@ -363,9 +374,17 @@ public class LeaseCleanupManager { boolean cleanedUpGarbageLease; boolean wereChildShardsPresent; boolean wasResourceNotFound; + String cleanupFailureReason; public boolean leaseCleanedUp() { return cleanedUpCompletedLease | cleanedUpGarbageLease; } } + + @Value + @Accessors(fluent = true) + private static class CompletedShardResult { + boolean cleanedUp; + String failureMsg; + } }