diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 9d53e75c..5418f1bf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -199,7 +199,10 @@ public class ShutdownTask implements ConsumerTask { final Lease updatedLease = currentLease.copy(); updatedLease.childShardIds(childShardIds); - leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo)); + final boolean updateResult = leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo)); + if (!updateResult) { + throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.shardId()); + } log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 7992d604..f65655db 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -127,11 +127,12 @@ public class ShutdownTaskTest { * This test is for the scenario that customer doesn't implement checkpoint in their implementation */ @Test - public final void testCallWhenApplicationDoesNotCheckpoint() { + public final void testCallWhenApplicationDoesNotCheckpoint() throws Exception { when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); final TaskResult result = task.call(); assertNotNull(result.getException()); @@ -174,6 +175,7 @@ public class ShutdownTaskTest { Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); final TaskResult result = task.call(); assertNull(result.getException());