diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 11807b9c..95f0f0f9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -469,7 +469,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } } catch (ConditionalCheckFailedException e) { log.debug("Did not create lease {} because it already existed", lease); - return false; + + // If we had a spurious retry during the Dynamo put, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter(); + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("create", lease.leaseKey(), e); } @@ -593,7 +605,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); - return false; + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter() + 1; + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("take", lease.leaseKey(), e); } @@ -638,7 +662,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}", lease.leaseKey(), lease.leaseOwner()); - return false; + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter() + 1; + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("evict", lease.leaseKey(), e); } @@ -735,7 +771,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Lease update failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); - return false; + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter() + 1; + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("update", lease.leaseKey(), e); }