diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java index 88a6f9b2..a53ec4ab 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java @@ -322,6 +322,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer { long startTime = System.currentTimeMillis(); boolean success = false; + Lease authoritativeLeaseCopy = authoritativeLease.copy(); try { log.info("Updating lease from {} to {}", authoritativeLease, lease); synchronized (authoritativeLease) { @@ -358,6 +359,10 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer { success = true; return updatedLease; } + } catch (ProvisionedThroughputException | InvalidStateException | DependencyException e) { + // On failure in updating DDB, revert changes to in memory lease + authoritativeLease.update(authoritativeLeaseCopy); + throw e; } finally { MetricsUtil.addSuccessAndLatency(scope, "UpdateLease", success, startTime, MetricsLevel.DETAILED); MetricsUtil.endScope(scope); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 8a700c19..16a443c1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -34,13 +34,16 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -63,7 +66,7 @@ public class DynamoDBLeaseRenewerTest { System.nanoTime(), null, null, - null, + 1L, new HashSet<>(), new HashSet<>(), null, @@ -134,4 +137,35 @@ public class DynamoDBLeaseRenewerTest { // Clear the list to avoid triggering expectation mismatch in after(). leasesToRenew.clear(); } + + @Test + public void testLeaseRenewerDoesNotUpdateInMemoryLeaseIfDDBFailsUpdate() + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + String leaseKey = "leaseToUpdate"; + Lease lease = newLease(leaseKey); + lease.checkpoint(ExtendedSequenceNumber.LATEST); + leasesToRenew = new ArrayList<>(); + leasesToRenew.add(lease); + renewer.addLeasesToRenew(leasesToRenew); + + doReturn(true).when(leaseRefresher).renewLease(lease); + renewer.renewLeases(); + + Lease updatedLease = newLease(leaseKey); + updatedLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + + doThrow(new DependencyException(new RuntimeException())) + .when(leaseRefresher) + .updateLease(updatedLease); + + try { + UUID concurrencyToken = renewer.getCurrentlyHeldLease(leaseKey).concurrencyToken(); + renewer.updateLease(updatedLease, concurrencyToken, "test", "dummyShardId"); + fail(); + } catch (DependencyException e) { + // expected + } + assertEquals(0L, (long) lease.leaseCounter()); // leaseCounter should not be incremented due to DDB failure + assertEquals(ExtendedSequenceNumber.LATEST, lease.checkpoint()); + } }