Fixed NPE in LeaseCleanupManager. (#1061)
This commit is contained in:
parent
27b166c5aa
commit
504ea10859
2 changed files with 48 additions and 58 deletions
|
|
@ -117,8 +117,8 @@ public class LeaseCleanupManager {
|
||||||
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
|
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
|
||||||
final Lease lease = leasePendingDeletion.lease();
|
final Lease lease = leasePendingDeletion.lease();
|
||||||
if (lease == null) {
|
if (lease == null) {
|
||||||
log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.",
|
log.warn("Cannot enqueue {} for {} as instance doesn't hold the lease for that shard.",
|
||||||
lease.leaseKey());
|
leasePendingDeletion.shardInfo(), leasePendingDeletion.streamIdentifier());
|
||||||
} else {
|
} else {
|
||||||
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
|
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
|
||||||
if (!deletionQueue.add(leasePendingDeletion)) {
|
if (!deletionQueue.add(leasePendingDeletion)) {
|
||||||
|
|
|
||||||
|
|
@ -45,19 +45,14 @@ import static org.mockito.Mockito.when;
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class LeaseCleanupManagerTest {
|
public class LeaseCleanupManagerTest {
|
||||||
|
|
||||||
private ShardInfo shardInfo;
|
private static final ShardInfo SHARD_INFO = new ShardInfo("shardId", "concurrencyToken",
|
||||||
private StreamIdentifier streamIdentifier;
|
Collections.emptySet(), ExtendedSequenceNumber.LATEST);
|
||||||
private String concurrencyToken = "1234";
|
|
||||||
|
|
||||||
private String shardId = "shardId";
|
private static final StreamIdentifier STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
|
||||||
private String splitParent = "splitParent";
|
|
||||||
private String mergeParent1 = "mergeParent-1";
|
|
||||||
private String mergeParent2 = "mergeParent-2";
|
|
||||||
|
|
||||||
private Duration maxFutureWait = Duration.ofSeconds(1);
|
private final long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis();
|
||||||
private long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis();
|
private final long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
|
||||||
private long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
|
private final long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
|
||||||
private long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
|
|
||||||
private boolean cleanupLeasesOfCompletedShards = true;
|
private boolean cleanupLeasesOfCompletedShards = true;
|
||||||
private LeaseCleanupManager leaseCleanupManager;
|
private LeaseCleanupManager leaseCleanupManager;
|
||||||
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
|
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
|
||||||
|
|
@ -73,9 +68,6 @@ public class LeaseCleanupManagerTest {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
|
|
||||||
ExtendedSequenceNumber.LATEST);
|
|
||||||
streamIdentifier = StreamIdentifier.singleStreamInstance("streamName");
|
|
||||||
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool,
|
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool,
|
||||||
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
|
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
|
||||||
garbageLeaseCleanupIntervalMillis);
|
garbageLeaseCleanupIntervalMillis);
|
||||||
|
|
@ -112,10 +104,8 @@ public class LeaseCleanupManagerTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testParentShardLeaseDeletedSplitCase() throws Exception {
|
public final void testParentShardLeaseDeletedSplitCase() throws Exception {
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForSplit(),
|
||||||
ExtendedSequenceNumber.LATEST);
|
ExtendedSequenceNumber.LATEST, 1);
|
||||||
|
|
||||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -124,10 +114,8 @@ public class LeaseCleanupManagerTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testParentShardLeaseDeletedMergeCase() throws Exception {
|
public final void testParentShardLeaseDeletedMergeCase() throws Exception {
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForMerge(),
|
||||||
ExtendedSequenceNumber.LATEST);
|
ExtendedSequenceNumber.LATEST, 1);
|
||||||
|
|
||||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -136,15 +124,14 @@ public class LeaseCleanupManagerTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testNoLeasesDeletedWhenNotEnabled() throws Exception {
|
public final void testNoLeasesDeletedWhenNotEnabled() throws Exception {
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
|
||||||
ExtendedSequenceNumber.LATEST);
|
|
||||||
cleanupLeasesOfCompletedShards = false;
|
cleanupLeasesOfCompletedShards = false;
|
||||||
|
|
||||||
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool,
|
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool,
|
||||||
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
|
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
|
||||||
garbageLeaseCleanupIntervalMillis);
|
garbageLeaseCleanupIntervalMillis);
|
||||||
|
|
||||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0);
|
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForSplit(),
|
||||||
|
ExtendedSequenceNumber.LATEST, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -155,10 +142,8 @@ public class LeaseCleanupManagerTest {
|
||||||
public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception {
|
public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception {
|
||||||
List<ChildShard> childShards = childShardsForSplit();
|
List<ChildShard> childShards = childShardsForSplit();
|
||||||
|
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShards,
|
||||||
ExtendedSequenceNumber.LATEST);
|
ExtendedSequenceNumber.LATEST, false, 0);
|
||||||
|
|
||||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, ExtendedSequenceNumber.LATEST, false, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -179,12 +164,9 @@ public class LeaseCleanupManagerTest {
|
||||||
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP);
|
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber)
|
private void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForMerge(), extendedSequenceNumber, 0);
|
||||||
ExtendedSequenceNumber.LATEST);
|
|
||||||
|
|
||||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), extendedSequenceNumber, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -192,33 +174,38 @@ public class LeaseCleanupManagerTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception {
|
public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception {
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.singleton("parent"),
|
final ShardInfo shardInfo = new ShardInfo("shardId-0", "concurrencyToken", Collections.singleton("parent"),
|
||||||
ExtendedSequenceNumber.LATEST);
|
ExtendedSequenceNumber.LATEST);
|
||||||
|
|
||||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0);
|
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify {@link NullPointerException} is not thrown when a null lease is enqueued.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testEnqueueNullLease() {
|
||||||
|
leaseCleanupManager.enqueueForDeletion(createLeasePendingDeletion(null, SHARD_INFO));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found.
|
* Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception {
|
public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception {
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
final Lease heldLease = LeaseHelper.createLease(SHARD_INFO.shardId(), "leaseOwner",
|
||||||
ExtendedSequenceNumber.LATEST);
|
Collections.singleton("parentShardId"));
|
||||||
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
|
||||||
|
|
||||||
testLeaseDeletedWhenShardDoesNotExist(heldLease);
|
testLeaseDeletedWhenShardDoesNotExist(heldLease);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests ResourceNotFound case when completed lease cleanup is disabled.
|
* Tests ResourceNotFound case when completed lease cleanup is disabled.
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception {
|
public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception {
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
final Lease heldLease = LeaseHelper.createLease(SHARD_INFO.shardId(), "leaseOwner",
|
||||||
ExtendedSequenceNumber.LATEST);
|
Collections.singleton("parentShardId"));
|
||||||
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
|
||||||
|
|
||||||
cleanupLeasesOfCompletedShards = false;
|
cleanupLeasesOfCompletedShards = false;
|
||||||
|
|
||||||
|
|
@ -229,32 +216,31 @@ public class LeaseCleanupManagerTest {
|
||||||
testLeaseDeletedWhenShardDoesNotExist(heldLease);
|
testLeaseDeletedWhenShardDoesNotExist(heldLease);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception {
|
private void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception {
|
||||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
|
when(leaseCoordinator.getCurrentlyHeldLease(SHARD_INFO.shardId())).thenReturn(heldLease);
|
||||||
when(shardDetector.getChildShards(any(String.class))).thenThrow(ResourceNotFoundException.class);
|
when(shardDetector.getChildShards(any(String.class))).thenThrow(ResourceNotFoundException.class);
|
||||||
when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease);
|
when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease);
|
||||||
|
|
||||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo, shardDetector));
|
leaseCleanupManager.enqueueForDeletion(createLeasePendingDeletion(heldLease, SHARD_INFO));
|
||||||
leaseCleanupManager.cleanupLeases();
|
leaseCleanupManager.cleanupLeases();
|
||||||
|
|
||||||
verify(shardDetector, times(1)).getChildShards(shardInfo.shardId());
|
verify(shardDetector).getChildShards(SHARD_INFO.shardId());
|
||||||
verify(leaseRefresher, times(1)).deleteLease(heldLease);
|
verify(leaseRefresher).deleteLease(heldLease);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
||||||
ExtendedSequenceNumber extendedSequenceNumber,
|
ExtendedSequenceNumber extendedSequenceNumber,
|
||||||
int expectedDeletedLeases) throws Exception {
|
int expectedDeletedLeases) throws Exception {
|
||||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases);
|
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
||||||
ExtendedSequenceNumber extendedSequenceNumber,
|
ExtendedSequenceNumber extendedSequenceNumber,
|
||||||
boolean childShardLeasesPresent,
|
boolean childShardLeasesPresent,
|
||||||
int expectedDeletedLeases) throws Exception {
|
int expectedDeletedLeases) throws Exception {
|
||||||
|
|
||||||
final Lease lease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", shardInfo.parentShardIds(),
|
final Lease lease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", shardInfo.parentShardIds(),
|
||||||
childShards.stream().map(c -> c.shardId()).collect(Collectors.toSet()));
|
childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()));
|
||||||
final List<Lease> childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease(
|
final List<Lease> childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease(
|
||||||
ShardInfo.getLeaseKey(shardInfo, c.shardId()), "leaseOwner", Collections.singleton(shardInfo.shardId()),
|
ShardInfo.getLeaseKey(shardInfo, c.shardId()), "leaseOwner", Collections.singleton(shardInfo.shardId()),
|
||||||
Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList());
|
Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList());
|
||||||
|
|
@ -273,15 +259,15 @@ public class LeaseCleanupManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo, shardDetector));
|
leaseCleanupManager.enqueueForDeletion(createLeasePendingDeletion(lease, shardInfo));
|
||||||
leaseCleanupManager.cleanupLeases();
|
leaseCleanupManager.cleanupLeases();
|
||||||
|
|
||||||
verify(shardDetector, times(1)).getChildShards(shardInfo.shardId());
|
verify(shardDetector).getChildShards(shardInfo.shardId());
|
||||||
verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class));
|
verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ChildShard> childShardsForSplit() {
|
private List<ChildShard> childShardsForSplit() {
|
||||||
List<String> parentShards = Arrays.asList(splitParent);
|
final List<String> parentShards = Collections.singletonList("splitParent");
|
||||||
|
|
||||||
ChildShard leftChild = ChildShard.builder()
|
ChildShard leftChild = ChildShard.builder()
|
||||||
.shardId("leftChild")
|
.shardId("leftChild")
|
||||||
|
|
@ -294,11 +280,11 @@ public class LeaseCleanupManagerTest {
|
||||||
.hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"))
|
.hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
return Arrays.asList(leftChild, rightChild);
|
return Arrays.asList(leftChild, rightChild);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ChildShard> childShardsForMerge() {
|
private List<ChildShard> childShardsForMerge() {
|
||||||
List<String> parentShards = Arrays.asList(mergeParent1, mergeParent2);
|
final List<String> parentShards = Arrays.asList("mergeParent1", "mergeParent2");
|
||||||
|
|
||||||
ChildShard child = ChildShard.builder()
|
ChildShard child = ChildShard.builder()
|
||||||
.shardId("onlyChild")
|
.shardId("onlyChild")
|
||||||
|
|
@ -308,4 +294,8 @@ public class LeaseCleanupManagerTest {
|
||||||
|
|
||||||
return Collections.singletonList(child);
|
return Collections.singletonList(child);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private LeasePendingDeletion createLeasePendingDeletion(final Lease lease, final ShardInfo shardInfo) {
|
||||||
|
return new LeasePendingDeletion(STREAM_IDENTIFIER, lease, shardInfo, shardDetector);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue