Update Parent lease fix

This commit is contained in:
Chunxue Yang 2020-05-06 21:12:50 -07:00
parent d28d26c635
commit 80cdf0df6f
2 changed files with 7 additions and 2 deletions

View file

@ -199,7 +199,10 @@ public class ShutdownTask implements ConsumerTask {
final Lease updatedLease = currentLease.copy();
updatedLease.childShardIds(childShardIds);
leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
final boolean updateResult = leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
if (!updateResult) {
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.shardId());
}
log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds);
}

View file

@ -127,11 +127,12 @@ public class ShutdownTaskTest {
* This test is for the scenario that customer doesn't implement checkpoint in their implementation
*/
@Test
public final void testCallWhenApplicationDoesNotCheckpoint() {
public final void testCallWhenApplicationDoesNotCheckpoint() throws Exception {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
final TaskResult result = task.call();
assertNotNull(result.getException());
@ -174,6 +175,7 @@ public class ShutdownTaskTest {
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
final TaskResult result = task.call();
assertNull(result.getException());