From 7032ea67eced2745bba8090405da9fc50859dad7 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 25 Oct 2017 08:11:20 -0700 Subject: [PATCH] Spurious update fix (#247) * Handle spurious lease renewal failures gracefully. If the request to conditionally update a lease counter in DynamoDB fails, it's considered a failure to renew the lease. This is a good thing, except if the request failure was just because of connectivity problems. In this case the counter *did* update in DynamoDB, but the Dynamo client retries the request which then fails the update condition (since the lease counter no longer matches expected value). To handle this gracefully we opt to get the lease record from Dynamo and examine the lease owner and counter. If it matches what we were expecting, then we consider renewal a success. --- .../kinesis/leases/impl/LeaseManager.java | 15 +++++- .../leases/impl/LeaseIntegrationTest.java | 19 +++---- .../impl/LeaseManagerIntegrationTest.java | 21 ++++---- .../impl/LeaseRenewerIntegrationTest.java | 49 ++++++++++--------- .../leases/impl/TestHarnessBuilder.java | 33 ++++++++----- 5 files changed, 82 insertions(+), 55 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index a2bf33a2..9dc2a4a3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.leases.util.DynamoUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -386,7 +387,19 @@ public class LeaseManager implements ILeaseManager { + " because the lease counter was not " + lease.getLeaseCounter()); } - return false; + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease counter + // are what we expected. + String expectedOwner = lease.getLeaseOwner(); + Long expectedCounter = lease.getLeaseCounter() + 1; + T updatedLease = getLease(lease.getLeaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) || + !expectedCounter.equals(updatedLease.getLeaseCounter())) { + return false; + } + + LOG.info("Detected spurious renewal failure for lease with key " + lease.getLeaseKey() + + ", but recovered"); } catch (AmazonClientException e) { throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java index 57a9c99b..e7ff0ebe 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java @@ -1,21 +1,22 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; import java.util.logging.Logger; +import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Ignore; diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java index 23cc9fc1..dcaedc38 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; @@ -108,7 +108,8 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { KinesisClientLease leaseCopy = leaseManager.getLease(lease.getLeaseKey()); - leaseManager.renewLease(lease); + // lose lease + leaseManager.takeLease(lease, "bar"); Assert.assertFalse(leaseManager.renewLease(leaseCopy)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java index 9792d006..8ad19d34 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java @@ -1,30 +1,29 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.Executors; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Executors; public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { @@ -58,7 +57,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { builder.addLeasesToRenew(renewer, "1", "2"); KinesisClientLease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2"); - leaseManager.updateLease(renewedLease); + // lose lease 2 + leaseManager.takeLease(renewedLease, "bar"); + builder.renewMutateAssert(renewer, "1"); } @@ -96,9 +97,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { public void testGetCurrentlyHeldLeases() throws LeasingException { TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager); - KinesisClientLease lease2 = builder.withLease("1", "foo").withLease("2", "foo").build().get("2"); + builder.withLease("1", "foo").withLease("2", "foo").build(); builder.addLeasesToRenew(renewer, "1", "2"); - builder.renewMutateAssert(renewer, "1", "2"); + KinesisClientLease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2"); // This should be a copy that doesn't get updated Map heldLeases = renewer.getCurrentlyHeldLeases(); @@ -106,7 +107,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { Assert.assertEquals((Long) 1L, heldLeases.get("1").getLeaseCounter()); Assert.assertEquals((Long) 1L, heldLeases.get("2").getLeaseCounter()); - leaseManager.updateLease(lease2); // lose lease 2 + // lose lease 2 + leaseManager.takeLease(lease2, "bar"); + // Do another renewal and make sure the copy doesn't change builder.renewMutateAssert(renewer, "1"); @@ -176,7 +179,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { KinesisClientLease lease = renewer.getCurrentlyHeldLease("1"); // cause lease loss such that the renewer knows the lease has been lost when update is called - leaseManager.renewLease(lease); + leaseManager.takeLease(lease, "bar"); builder.renewMutateAssert(renewer); lease.setCheckpoint(new ExtendedSequenceNumber("new checkpoint")); @@ -195,7 +198,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { KinesisClientLease lease = renewer.getCurrentlyHeldLease("1"); // cause lease loss such that the renewer knows the lease has been lost when update is called - leaseManager.renewLease(lease); + leaseManager.takeLease(lease, "bar"); builder.renewMutateAssert(renewer); // regain the lease diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java index 6b6d673c..0dfbb568 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; @@ -35,6 +35,7 @@ public class TestHarnessBuilder { private Map leases = new HashMap(); private KinesisClientLeaseManager leaseManager; + private Map originalLeases = new HashMap<>(); private Callable timeProvider = new Callable() { @@ -54,6 +55,15 @@ public class TestHarnessBuilder { } public TestHarnessBuilder withLease(String shardId, String owner) { + KinesisClientLease lease = createLease(shardId, owner); + KinesisClientLease originalLease = createLease(shardId, owner); + + leases.put(shardId, lease); + originalLeases.put(shardId, originalLease); + return this; + } + + private KinesisClientLease createLease(String shardId, String owner) { KinesisClientLease lease = new KinesisClientLease(); lease.setCheckpoint(new ExtendedSequenceNumber("checkpoint")); lease.setOwnerSwitchesSinceCheckpoint(0L); @@ -62,8 +72,7 @@ public class TestHarnessBuilder { lease.setParentShardIds(Collections.singleton("parentShardId")); lease.setLeaseKey(shardId); - leases.put(shardId, lease); - return this; + return lease; } public Map build() throws LeasingException { @@ -147,7 +156,7 @@ public class TestHarnessBuilder { Assert.assertEquals(renewedShardIds.length, heldLeases.size()); for (String shardId : renewedShardIds) { - KinesisClientLease original = leases.get(shardId); + KinesisClientLease original = originalLeases.get(shardId); Assert.assertNotNull(original); KinesisClientLease actual = heldLeases.get(shardId);