Merge pull request #32 from ychunxue/ltr_1_updateLeaseFix
Update Parent lease fix
This commit is contained in:
commit
52f1be5c67
2 changed files with 7 additions and 2 deletions
|
|
@ -199,7 +199,10 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
|
|
||||||
final Lease updatedLease = currentLease.copy();
|
final Lease updatedLease = currentLease.copy();
|
||||||
updatedLease.childShardIds(childShardIds);
|
updatedLease.childShardIds(childShardIds);
|
||||||
leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
|
final boolean updateResult = leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
|
||||||
|
if (!updateResult) {
|
||||||
|
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.shardId());
|
||||||
|
}
|
||||||
log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds);
|
log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -127,11 +127,12 @@ public class ShutdownTaskTest {
|
||||||
* This test is for the scenario that customer doesn't implement checkpoint in their implementation
|
* This test is for the scenario that customer doesn't implement checkpoint in their implementation
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testCallWhenApplicationDoesNotCheckpoint() {
|
public final void testCallWhenApplicationDoesNotCheckpoint() throws Exception {
|
||||||
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
|
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
|
||||||
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
|
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
|
||||||
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
||||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||||
|
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
|
||||||
|
|
||||||
final TaskResult result = task.call();
|
final TaskResult result = task.call();
|
||||||
assertNotNull(result.getException());
|
assertNotNull(result.getException());
|
||||||
|
|
@ -174,6 +175,7 @@ public class ShutdownTaskTest {
|
||||||
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
|
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
|
||||||
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
||||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||||
|
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
|
||||||
|
|
||||||
final TaskResult result = task.call();
|
final TaskResult result = task.call();
|
||||||
assertNull(result.getException());
|
assertNull(result.getException());
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue