From bdf019748f9c41f65b634c9dc30e071669eca1ec Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 27 Jul 2020 16:26:51 -0700 Subject: [PATCH 1/2] Add conditional check while updating the lease table meta info --- .../amazon/kinesis/leases/LeaseSerializer.java | 7 +++++++ .../leases/dynamodb/DynamoDBLeaseRefresher.java | 5 +++++ .../leases/dynamodb/DynamoDBLeaseSerializer.java | 10 ++++++++++ 3 files changed, 22 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java index 5dbf6366..09d2280e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java @@ -81,6 +81,13 @@ public interface LeaseSerializer { */ Map getDynamoNonexistantExpectation(); + /** + * @return the attribute value map asserting that a lease does exist. + */ + default Map getDynamoExistantExpectation() { + throw new UnsupportedOperationException(); + } + /** * @param lease * @return the attribute value map that increments a lease counter diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index df5746a2..98f75144 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -681,8 +681,10 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { throws DependencyException, InvalidStateException, ProvisionedThroughputException { log.debug("Updating lease without expectation {}", lease); final AWSExceptionManager exceptionManager = createExceptionManager(); + exceptionManager.add(ConditionalCheckFailedException.class, t -> t); Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease)) + .expected(serializer.getDynamoExistantExpectation()) .attributeUpdates(updates).build(); try { try { @@ -692,6 +694,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (InterruptedException e) { throw new DependencyException(e); } + } catch (ConditionalCheckFailedException e) { + log.warn("Lease update failed for lease with key {} because the lease did not exist at the time of the update", + lease.leaseKey(), e); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("update", lease.leaseKey(), e); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 4523bada..832118dc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -192,6 +192,16 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { return result; } + @Override + public Map getDynamoExistantExpectation() { + Map result = new HashMap<>(); + + ExpectedAttributeValue expectedAV = ExpectedAttributeValue.builder().exists(true).build(); + result.put(LEASE_KEY_KEY, expectedAV); + + return result; + } + @Override public Map getDynamoLeaseCounterUpdate(final Lease lease) { return getDynamoLeaseCounterUpdate(lease.leaseCounter()); From e2a1d71053bb6223daea40d95662d954d8578334 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 27 Jul 2020 16:37:40 -0700 Subject: [PATCH 2/2] Update exception msg --- .../java/software/amazon/kinesis/leases/LeaseSerializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java index 09d2280e..ff72891a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java @@ -85,7 +85,7 @@ public interface LeaseSerializer { * @return the attribute value map asserting that a lease does exist. */ default Map getDynamoExistantExpectation() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented"); } /**