diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index f9e52e1c..d62cd476 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -117,8 +117,8 @@ public class LeaseCleanupManager { public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) { final Lease lease = leasePendingDeletion.lease(); if (lease == null) { - log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.", - lease.leaseKey()); + log.warn("Cannot enqueue {} for {} as instance doesn't hold the lease for that shard.", + leasePendingDeletion.shardInfo(), leasePendingDeletion.streamIdentifier()); } else { log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey()); if (!deletionQueue.add(leasePendingDeletion)) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java index 2e691844..9a731f80 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java @@ -45,19 +45,14 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class LeaseCleanupManagerTest { - private ShardInfo shardInfo; - private StreamIdentifier streamIdentifier; - private String concurrencyToken = "1234"; + private static final ShardInfo SHARD_INFO = new ShardInfo("shardId", "concurrencyToken", + Collections.emptySet(), ExtendedSequenceNumber.LATEST); - private String shardId = "shardId"; - private String splitParent = "splitParent"; - private String mergeParent1 = "mergeParent-1"; - private String mergeParent2 = "mergeParent-2"; + private static final StreamIdentifier STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); - private Duration maxFutureWait = Duration.ofSeconds(1); - private long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis(); - private long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis(); - private long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis(); + private final long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis(); + private final long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis(); + private final long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis(); private boolean cleanupLeasesOfCompletedShards = true; private LeaseCleanupManager leaseCleanupManager; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); @@ -73,9 +68,6 @@ public class LeaseCleanupManagerTest { @Before public void setUp() throws Exception { - shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - streamIdentifier = StreamIdentifier.singleStreamInstance("streamName"); leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); @@ -112,10 +104,8 @@ public class LeaseCleanupManagerTest { */ @Test public final void testParentShardLeaseDeletedSplitCase() throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - - verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 1); + verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForSplit(), + ExtendedSequenceNumber.LATEST, 1); } /** @@ -124,10 +114,8 @@ public class LeaseCleanupManagerTest { */ @Test public final void testParentShardLeaseDeletedMergeCase() throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - - verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 1); + verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForMerge(), + ExtendedSequenceNumber.LATEST, 1); } /** @@ -136,15 +124,14 @@ public class LeaseCleanupManagerTest { */ @Test public final void testNoLeasesDeletedWhenNotEnabled() throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); cleanupLeasesOfCompletedShards = false; leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); - verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0); + verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForSplit(), + ExtendedSequenceNumber.LATEST, 0); } /** @@ -155,10 +142,8 @@ public class LeaseCleanupManagerTest { public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception { List childShards = childShardsForSplit(); - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - - verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, ExtendedSequenceNumber.LATEST, false, 0); + verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShards, + ExtendedSequenceNumber.LATEST, false, 0); } /** @@ -179,12 +164,9 @@ public class LeaseCleanupManagerTest { testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP); } - private final void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber) + private void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber) throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - - verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), extendedSequenceNumber, 0); + verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForMerge(), extendedSequenceNumber, 0); } /** @@ -192,33 +174,38 @@ public class LeaseCleanupManagerTest { */ @Test public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.singleton("parent"), + final ShardInfo shardInfo = new ShardInfo("shardId-0", "concurrencyToken", Collections.singleton("parent"), ExtendedSequenceNumber.LATEST); verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0); } + /** + * Verify {@link NullPointerException} is not thrown when a null lease is enqueued. + */ + @Test + public void testEnqueueNullLease() { + leaseCleanupManager.enqueueForDeletion(createLeasePendingDeletion(null, SHARD_INFO)); + } + /** * Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found. */ @Test public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); + final Lease heldLease = LeaseHelper.createLease(SHARD_INFO.shardId(), "leaseOwner", + Collections.singleton("parentShardId")); testLeaseDeletedWhenShardDoesNotExist(heldLease); } /** * Tests ResourceNotFound case when completed lease cleanup is disabled. - * @throws Exception */ @Test public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); + final Lease heldLease = LeaseHelper.createLease(SHARD_INFO.shardId(), "leaseOwner", + Collections.singleton("parentShardId")); cleanupLeasesOfCompletedShards = false; @@ -229,32 +216,31 @@ public class LeaseCleanupManagerTest { testLeaseDeletedWhenShardDoesNotExist(heldLease); } - public void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception { + private void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception { when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); + when(leaseCoordinator.getCurrentlyHeldLease(SHARD_INFO.shardId())).thenReturn(heldLease); when(shardDetector.getChildShards(any(String.class))).thenThrow(ResourceNotFoundException.class); when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease); - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo, shardDetector)); + leaseCleanupManager.enqueueForDeletion(createLeasePendingDeletion(heldLease, SHARD_INFO)); leaseCleanupManager.cleanupLeases(); - verify(shardDetector, times(1)).getChildShards(shardInfo.shardId()); - verify(leaseRefresher, times(1)).deleteLease(heldLease); + verify(shardDetector).getChildShards(SHARD_INFO.shardId()); + verify(leaseRefresher).deleteLease(heldLease); } - private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, + private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, ExtendedSequenceNumber extendedSequenceNumber, int expectedDeletedLeases) throws Exception { verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases); } - private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, + private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, ExtendedSequenceNumber extendedSequenceNumber, boolean childShardLeasesPresent, int expectedDeletedLeases) throws Exception { - final Lease lease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", shardInfo.parentShardIds(), - childShards.stream().map(c -> c.shardId()).collect(Collectors.toSet())); + childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet())); final List childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease( ShardInfo.getLeaseKey(shardInfo, c.shardId()), "leaseOwner", Collections.singleton(shardInfo.shardId()), Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList()); @@ -273,15 +259,15 @@ public class LeaseCleanupManagerTest { } } - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo, shardDetector)); + leaseCleanupManager.enqueueForDeletion(createLeasePendingDeletion(lease, shardInfo)); leaseCleanupManager.cleanupLeases(); - verify(shardDetector, times(1)).getChildShards(shardInfo.shardId()); + verify(shardDetector).getChildShards(shardInfo.shardId()); verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class)); } private List childShardsForSplit() { - List parentShards = Arrays.asList(splitParent); + final List parentShards = Collections.singletonList("splitParent"); ChildShard leftChild = ChildShard.builder() .shardId("leftChild") @@ -294,11 +280,11 @@ public class LeaseCleanupManagerTest { .hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99")) .build(); - return Arrays.asList(leftChild, rightChild); + return Arrays.asList(leftChild, rightChild); } private List childShardsForMerge() { - List parentShards = Arrays.asList(mergeParent1, mergeParent2); + final List parentShards = Arrays.asList("mergeParent1", "mergeParent2"); ChildShard child = ChildShard.builder() .shardId("onlyChild") @@ -308,4 +294,8 @@ public class LeaseCleanupManagerTest { return Collections.singletonList(child); } + + private LeasePendingDeletion createLeasePendingDeletion(final Lease lease, final ShardInfo shardInfo) { + return new LeasePendingDeletion(STREAM_IDENTIFIER, lease, shardInfo, shardDetector); + } }