From 81939b599c0735946e165a7863ba788c8fe9b6f9 Mon Sep 17 00:00:00 2001 From: Eric Zhu Date: Mon, 25 Jan 2021 17:11:55 -0800 Subject: [PATCH] Add conditional check for spurious failures from DDB on lease put --- .../dynamodb/DynamoDBLeaseRefresher.java | 56 +++++++++++++++++-- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 8002eacc..d1fdd61c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -430,7 +430,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } } catch (ConditionalCheckFailedException e) { log.debug("Did not create lease {} because it already existed", lease); - return false; + + // If we had a spurious retry during the Dynamo put, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter(); + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("create", lease.leaseKey(), e); } @@ -552,7 +564,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); - return false; + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter() + 1; + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("take", lease.leaseKey(), e); } @@ -595,7 +619,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}", lease.leaseKey(), lease.leaseOwner()); - return false; + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter() + 1; + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("evict", lease.leaseKey(), e); } @@ -687,7 +723,19 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Lease update failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); - return false; + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease + // counter are what we expected. + String expectedOwner = lease.leaseOwner(); + Long expectedCounter = lease.leaseCounter() + 1; + final Lease updatedLease = getLease(lease.leaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.leaseOwner()) + || !expectedCounter.equals(updatedLease.leaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("update", lease.leaseKey(), e); }