diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 8002eacc..d1fdd61c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -430,7 +430,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } } catch (ConditionalCheckFailedException e) { log.debug("Did not create lease {} because it already existed", lease); - return false; + + // If we had a spurious retry during the Dynamo put, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter(); + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("create", lease.leaseKey(), e); } @@ -552,7 +564,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); - return false; + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter() + 1; + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("take", lease.leaseKey(), e); } @@ -595,7 +619,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}", lease.leaseKey(), lease.leaseOwner()); - return false; + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter() + 1; + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("evict", lease.leaseKey(), e); } @@ -687,7 +723,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Lease update failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); - return false; + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter() + 1; + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("update", lease.leaseKey(), e); }