diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java index 5dbf6366..ff72891a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java @@ -81,6 +81,13 @@ public interface LeaseSerializer { */ Map getDynamoNonexistantExpectation(); + /** + * @return the attribute value map asserting that a lease does exist. + */ + default Map getDynamoExistantExpectation() { + throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented"); + } + /** * @param lease * @return the attribute value map that increments a lease counter diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index df5746a2..98f75144 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -681,8 +681,10 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { throws DependencyException, InvalidStateException, ProvisionedThroughputException { log.debug("Updating lease without expectation {}", lease); final AWSExceptionManager exceptionManager = createExceptionManager(); + exceptionManager.add(ConditionalCheckFailedException.class, t -> t); Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease)) + .expected(serializer.getDynamoExistantExpectation()) .attributeUpdates(updates).build(); try { try { @@ -692,6 +694,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (InterruptedException e) { throw new DependencyException(e); } + } catch (ConditionalCheckFailedException e) { + log.warn("Lease update failed for lease with key {} because the lease did not exist at the time of the update", + lease.leaseKey(), e); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("update", lease.leaseKey(), e); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 4523bada..832118dc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -192,6 +192,16 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { return result; } + @Override + public Map getDynamoExistantExpectation() { + Map result = new HashMap<>(); + + ExpectedAttributeValue expectedAV = ExpectedAttributeValue.builder().exists(true).build(); + result.put(LEASE_KEY_KEY, expectedAV); + + return result; + } + @Override public Map getDynamoLeaseCounterUpdate(final Lease lease) { return getDynamoLeaseCounterUpdate(lease.leaseCounter());