bugfix: revert update of in memory lease if DDB lease renewal throws error (#1354)
This commit is contained in:
parent
78fb42ede1
commit
715690d2c0
2 changed files with 40 additions and 1 deletions
|
|
@ -322,6 +322,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
|
|||
|
||||
long startTime = System.currentTimeMillis();
|
||||
boolean success = false;
|
||||
Lease authoritativeLeaseCopy = authoritativeLease.copy();
|
||||
try {
|
||||
log.info("Updating lease from {} to {}", authoritativeLease, lease);
|
||||
synchronized (authoritativeLease) {
|
||||
|
|
@ -358,6 +359,10 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
|
|||
success = true;
|
||||
return updatedLease;
|
||||
}
|
||||
} catch (ProvisionedThroughputException | InvalidStateException | DependencyException e) {
|
||||
// On failure in updating DDB, revert changes to in memory lease
|
||||
authoritativeLease.update(authoritativeLeaseCopy);
|
||||
throw e;
|
||||
} finally {
|
||||
MetricsUtil.addSuccessAndLatency(scope, "UpdateLease", success, startTime, MetricsLevel.DETAILED);
|
||||
MetricsUtil.endScope(scope);
|
||||
|
|
|
|||
|
|
@ -34,13 +34,16 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
|
|||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
|
|
@ -63,7 +66,7 @@ public class DynamoDBLeaseRenewerTest {
|
|||
System.nanoTime(),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1L,
|
||||
new HashSet<>(),
|
||||
new HashSet<>(),
|
||||
null,
|
||||
|
|
@ -134,4 +137,35 @@ public class DynamoDBLeaseRenewerTest {
|
|||
// Clear the list to avoid triggering expectation mismatch in after().
|
||||
leasesToRenew.clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeaseRenewerDoesNotUpdateInMemoryLeaseIfDDBFailsUpdate()
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
String leaseKey = "leaseToUpdate";
|
||||
Lease lease = newLease(leaseKey);
|
||||
lease.checkpoint(ExtendedSequenceNumber.LATEST);
|
||||
leasesToRenew = new ArrayList<>();
|
||||
leasesToRenew.add(lease);
|
||||
renewer.addLeasesToRenew(leasesToRenew);
|
||||
|
||||
doReturn(true).when(leaseRefresher).renewLease(lease);
|
||||
renewer.renewLeases();
|
||||
|
||||
Lease updatedLease = newLease(leaseKey);
|
||||
updatedLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
|
||||
doThrow(new DependencyException(new RuntimeException()))
|
||||
.when(leaseRefresher)
|
||||
.updateLease(updatedLease);
|
||||
|
||||
try {
|
||||
UUID concurrencyToken = renewer.getCurrentlyHeldLease(leaseKey).concurrencyToken();
|
||||
renewer.updateLease(updatedLease, concurrencyToken, "test", "dummyShardId");
|
||||
fail();
|
||||
} catch (DependencyException e) {
|
||||
// expected
|
||||
}
|
||||
assertEquals(0L, (long) lease.leaseCounter()); // leaseCounter should not be incremented due to DDB failure
|
||||
assertEquals(ExtendedSequenceNumber.LATEST, lease.checkpoint());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue