bugfix: revert update of in memory lease if DDB lease renewal throws error
This commit is contained in:
parent
78fb42ede1
commit
380b8fc0a8
2 changed files with 40 additions and 1 deletions
|
|
@ -322,6 +322,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
|
||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
Lease authoritativeLeaseCopy = authoritativeLease.copy();
|
||||||
try {
|
try {
|
||||||
log.info("Updating lease from {} to {}", authoritativeLease, lease);
|
log.info("Updating lease from {} to {}", authoritativeLease, lease);
|
||||||
synchronized (authoritativeLease) {
|
synchronized (authoritativeLease) {
|
||||||
|
|
@ -358,6 +359,10 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
|
||||||
success = true;
|
success = true;
|
||||||
return updatedLease;
|
return updatedLease;
|
||||||
}
|
}
|
||||||
|
} catch (ProvisionedThroughputException | InvalidStateException | DependencyException e) {
|
||||||
|
// On failure in updating DDB, revert changes to in memory lease
|
||||||
|
authoritativeLease.update(authoritativeLeaseCopy);
|
||||||
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
MetricsUtil.addSuccessAndLatency(scope, "UpdateLease", success, startTime, MetricsLevel.DETAILED);
|
MetricsUtil.addSuccessAndLatency(scope, "UpdateLease", success, startTime, MetricsLevel.DETAILED);
|
||||||
MetricsUtil.endScope(scope);
|
MetricsUtil.endScope(scope);
|
||||||
|
|
|
||||||
|
|
@ -34,13 +34,16 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||||
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||||
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
|
@ -63,7 +66,7 @@ public class DynamoDBLeaseRenewerTest {
|
||||||
System.nanoTime(),
|
System.nanoTime(),
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
1L,
|
||||||
new HashSet<>(),
|
new HashSet<>(),
|
||||||
new HashSet<>(),
|
new HashSet<>(),
|
||||||
null,
|
null,
|
||||||
|
|
@ -134,4 +137,35 @@ public class DynamoDBLeaseRenewerTest {
|
||||||
// Clear the list to avoid triggering expectation mismatch in after().
|
// Clear the list to avoid triggering expectation mismatch in after().
|
||||||
leasesToRenew.clear();
|
leasesToRenew.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLeaseRenewerDoesNotUpdateInMemoryLeaseIfDDBFailsUpdate()
|
||||||
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
String leaseKey = "leaseToUpdate";
|
||||||
|
Lease lease = newLease(leaseKey);
|
||||||
|
lease.checkpoint(ExtendedSequenceNumber.LATEST);
|
||||||
|
leasesToRenew = new ArrayList<>();
|
||||||
|
leasesToRenew.add(lease);
|
||||||
|
renewer.addLeasesToRenew(leasesToRenew);
|
||||||
|
|
||||||
|
doReturn(true).when(leaseRefresher).renewLease(lease);
|
||||||
|
renewer.renewLeases();
|
||||||
|
|
||||||
|
Lease updatedLease = newLease(leaseKey);
|
||||||
|
updatedLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||||
|
|
||||||
|
doThrow(new DependencyException(new RuntimeException()))
|
||||||
|
.when(leaseRefresher)
|
||||||
|
.updateLease(updatedLease);
|
||||||
|
|
||||||
|
try {
|
||||||
|
UUID concurrencyToken = renewer.getCurrentlyHeldLease(leaseKey).concurrencyToken();
|
||||||
|
renewer.updateLease(updatedLease, concurrencyToken, "test", "dummyShardId");
|
||||||
|
fail();
|
||||||
|
} catch (DependencyException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
assertEquals(0L, (long) lease.leaseCounter()); // leaseCounter should not be incremented due to DDB failure
|
||||||
|
assertEquals(ExtendedSequenceNumber.LATEST, lease.checkpoint());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue