diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index a2bf33a2..9dc2a4a3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.leases.util.DynamoUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -386,7 +387,19 @@ public class LeaseManager implements ILeaseManager { + " because the lease counter was not " + lease.getLeaseCounter()); } - return false; + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease counter + // are what we expected. + String expectedOwner = lease.getLeaseOwner(); + Long expectedCounter = lease.getLeaseCounter() + 1; + T updatedLease = getLease(lease.getLeaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) || + !expectedCounter.equals(updatedLease.getLeaseCounter())) { + return false; + } + + LOG.info("Detected spurious renewal failure for lease with key " + lease.getLeaseKey() + + ", but recovered"); } catch (AmazonClientException e) { throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java index 57a9c99b..e7ff0ebe 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java @@ -1,21 +1,22 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; import java.util.logging.Logger; +import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Ignore; diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java index 23cc9fc1..dcaedc38 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; @@ -108,7 +108,8 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { KinesisClientLease leaseCopy = leaseManager.getLease(lease.getLeaseKey()); - leaseManager.renewLease(lease); + // lose lease + leaseManager.takeLease(lease, "bar"); Assert.assertFalse(leaseManager.renewLease(leaseCopy)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java index 9792d006..8ad19d34 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java @@ -1,30 +1,29 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.Executors; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Executors; public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { @@ -58,7 +57,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { builder.addLeasesToRenew(renewer, "1", "2"); KinesisClientLease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2"); - leaseManager.updateLease(renewedLease); + // lose lease 2 + leaseManager.takeLease(renewedLease, "bar"); + builder.renewMutateAssert(renewer, "1"); } @@ -96,9 +97,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { public void testGetCurrentlyHeldLeases() throws LeasingException { TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager); - KinesisClientLease lease2 = builder.withLease("1", "foo").withLease("2", "foo").build().get("2"); + builder.withLease("1", "foo").withLease("2", "foo").build(); builder.addLeasesToRenew(renewer, "1", "2"); - builder.renewMutateAssert(renewer, "1", "2"); + KinesisClientLease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2"); // This should be a copy that doesn't get updated Map heldLeases = renewer.getCurrentlyHeldLeases(); @@ -106,7 +107,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { Assert.assertEquals((Long) 1L, heldLeases.get("1").getLeaseCounter()); Assert.assertEquals((Long) 1L, heldLeases.get("2").getLeaseCounter()); - leaseManager.updateLease(lease2); // lose lease 2 + // lose lease 2 + leaseManager.takeLease(lease2, "bar"); + // Do another renewal and make sure the copy doesn't change builder.renewMutateAssert(renewer, "1"); @@ -176,7 +179,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { KinesisClientLease lease = renewer.getCurrentlyHeldLease("1"); // cause lease loss such that the renewer knows the lease has been lost when update is called - leaseManager.renewLease(lease); + leaseManager.takeLease(lease, "bar"); builder.renewMutateAssert(renewer); lease.setCheckpoint(new ExtendedSequenceNumber("new checkpoint")); @@ -195,7 +198,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { KinesisClientLease lease = renewer.getCurrentlyHeldLease("1"); // cause lease loss such that the renewer knows the lease has been lost when update is called - leaseManager.renewLease(lease); + leaseManager.takeLease(lease, "bar"); builder.renewMutateAssert(renewer); // regain the lease diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java index 6b6d673c..0dfbb568 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; @@ -35,6 +35,7 @@ public class TestHarnessBuilder { private Map leases = new HashMap(); private KinesisClientLeaseManager leaseManager; + private Map originalLeases = new HashMap<>(); private Callable timeProvider = new Callable() { @@ -54,6 +55,15 @@ public class TestHarnessBuilder { } public TestHarnessBuilder withLease(String shardId, String owner) { + KinesisClientLease lease = createLease(shardId, owner); + KinesisClientLease originalLease = createLease(shardId, owner); + + leases.put(shardId, lease); + originalLeases.put(shardId, originalLease); + return this; + } + + private KinesisClientLease createLease(String shardId, String owner) { KinesisClientLease lease = new KinesisClientLease(); lease.setCheckpoint(new ExtendedSequenceNumber("checkpoint")); lease.setOwnerSwitchesSinceCheckpoint(0L); @@ -62,8 +72,7 @@ public class TestHarnessBuilder { lease.setParentShardIds(Collections.singleton("parentShardId")); lease.setLeaseKey(shardId); - leases.put(shardId, lease); - return this; + return lease; } public Map build() throws LeasingException { @@ -147,7 +156,7 @@ public class TestHarnessBuilder { Assert.assertEquals(renewedShardIds.length, heldLeases.size()); for (String shardId : renewedShardIds) { - KinesisClientLease original = leases.get(shardId); + KinesisClientLease original = originalLeases.get(shardId); Assert.assertNotNull(original); KinesisClientLease actual = heldLeases.get(shardId);