Spurious update fix (#247)
* Handle spurious lease renewal failures gracefully. If the request to conditionally update a lease counter in DynamoDB fails, it's considered a failure to renew the lease. This is a good thing, except if the request failure was just because of connectivity problems. In this case the counter *did* update in DynamoDB, but the Dynamo client retries the request which then fails the update condition (since the lease counter no longer matches expected value). To handle this gracefully we opt to get the lease record from Dynamo and examine the lease owner and counter. If it matches what we were expecting, then we consider renewal a success.
This commit is contained in:
parent
cbcc898d71
commit
7032ea67ec
5 changed files with 82 additions and 55 deletions
|
|
@ -19,6 +19,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
|
@ -386,7 +387,19 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
|
||||||
+ " because the lease counter was not " + lease.getLeaseCounter());
|
+ " because the lease counter was not " + lease.getLeaseCounter());
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
// If we had a spurious retry during the Dynamo update, then this conditional PUT failure
|
||||||
|
// might be incorrect. So, we get the item straight away and check if the lease owner + lease counter
|
||||||
|
// are what we expected.
|
||||||
|
String expectedOwner = lease.getLeaseOwner();
|
||||||
|
Long expectedCounter = lease.getLeaseCounter() + 1;
|
||||||
|
T updatedLease = getLease(lease.getLeaseKey());
|
||||||
|
if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) ||
|
||||||
|
!expectedCounter.equals(updatedLease.getLeaseCounter())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Detected spurious renewal failure for lease with key " + lease.getLeaseKey()
|
||||||
|
+ ", but recovered");
|
||||||
} catch (AmazonClientException e) {
|
} catch (AmazonClientException e) {
|
||||||
throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e);
|
throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,22 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
*
|
*
|
||||||
* Licensed under the Amazon Software License (the "License").
|
* Licensed under the Amazon Software License (the "License").
|
||||||
* You may not use this file except in compliance with the License.
|
* You may not use this file except in compliance with the License.
|
||||||
* A copy of the License is located at
|
* A copy of the License is located at
|
||||||
*
|
*
|
||||||
* http://aws.amazon.com/asl/
|
* http://aws.amazon.com/asl/
|
||||||
*
|
*
|
||||||
* or in the "license" file accompanying this file. This file is distributed
|
* or in the "license" file accompanying this file. This file is distributed
|
||||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
* express or implied. See the License for the specific language governing
|
* express or implied. See the License for the specific language governing
|
||||||
* permissions and limitations under the License.
|
* permissions and limitations under the License.
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.leases.impl;
|
package com.amazonaws.services.kinesis.leases.impl;
|
||||||
|
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,16 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
*
|
*
|
||||||
* Licensed under the Amazon Software License (the "License").
|
* Licensed under the Amazon Software License (the "License").
|
||||||
* You may not use this file except in compliance with the License.
|
* You may not use this file except in compliance with the License.
|
||||||
* A copy of the License is located at
|
* A copy of the License is located at
|
||||||
*
|
*
|
||||||
* http://aws.amazon.com/asl/
|
* http://aws.amazon.com/asl/
|
||||||
*
|
*
|
||||||
* or in the "license" file accompanying this file. This file is distributed
|
* or in the "license" file accompanying this file. This file is distributed
|
||||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
* express or implied. See the License for the specific language governing
|
* express or implied. See the License for the specific language governing
|
||||||
* permissions and limitations under the License.
|
* permissions and limitations under the License.
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.leases.impl;
|
package com.amazonaws.services.kinesis.leases.impl;
|
||||||
|
|
||||||
|
|
@ -108,7 +108,8 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest {
|
||||||
|
|
||||||
KinesisClientLease leaseCopy = leaseManager.getLease(lease.getLeaseKey());
|
KinesisClientLease leaseCopy = leaseManager.getLease(lease.getLeaseKey());
|
||||||
|
|
||||||
leaseManager.renewLease(lease);
|
// lose lease
|
||||||
|
leaseManager.takeLease(lease, "bar");
|
||||||
|
|
||||||
Assert.assertFalse(leaseManager.renewLease(leaseCopy));
|
Assert.assertFalse(leaseManager.renewLease(leaseCopy));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,30 +1,29 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
*
|
*
|
||||||
* Licensed under the Amazon Software License (the "License").
|
* Licensed under the Amazon Software License (the "License").
|
||||||
* You may not use this file except in compliance with the License.
|
* You may not use this file except in compliance with the License.
|
||||||
* A copy of the License is located at
|
* A copy of the License is located at
|
||||||
*
|
*
|
||||||
* http://aws.amazon.com/asl/
|
* http://aws.amazon.com/asl/
|
||||||
*
|
*
|
||||||
* or in the "license" file accompanying this file. This file is distributed
|
* or in the "license" file accompanying this file. This file is distributed
|
||||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
* express or implied. See the License for the specific language governing
|
* express or implied. See the License for the specific language governing
|
||||||
* permissions and limitations under the License.
|
* permissions and limitations under the License.
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.leases.impl;
|
package com.amazonaws.services.kinesis.leases.impl;
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
||||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
|
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
|
|
||||||
|
|
@ -58,7 +57,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
builder.addLeasesToRenew(renewer, "1", "2");
|
builder.addLeasesToRenew(renewer, "1", "2");
|
||||||
KinesisClientLease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
KinesisClientLease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
||||||
|
|
||||||
leaseManager.updateLease(renewedLease);
|
// lose lease 2
|
||||||
|
leaseManager.takeLease(renewedLease, "bar");
|
||||||
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
builder.renewMutateAssert(renewer, "1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,9 +97,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
public void testGetCurrentlyHeldLeases() throws LeasingException {
|
public void testGetCurrentlyHeldLeases() throws LeasingException {
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);
|
||||||
|
|
||||||
KinesisClientLease lease2 = builder.withLease("1", "foo").withLease("2", "foo").build().get("2");
|
builder.withLease("1", "foo").withLease("2", "foo").build();
|
||||||
builder.addLeasesToRenew(renewer, "1", "2");
|
builder.addLeasesToRenew(renewer, "1", "2");
|
||||||
builder.renewMutateAssert(renewer, "1", "2");
|
KinesisClientLease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
||||||
|
|
||||||
// This should be a copy that doesn't get updated
|
// This should be a copy that doesn't get updated
|
||||||
Map<String, KinesisClientLease> heldLeases = renewer.getCurrentlyHeldLeases();
|
Map<String, KinesisClientLease> heldLeases = renewer.getCurrentlyHeldLeases();
|
||||||
|
|
@ -106,7 +107,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
Assert.assertEquals((Long) 1L, heldLeases.get("1").getLeaseCounter());
|
Assert.assertEquals((Long) 1L, heldLeases.get("1").getLeaseCounter());
|
||||||
Assert.assertEquals((Long) 1L, heldLeases.get("2").getLeaseCounter());
|
Assert.assertEquals((Long) 1L, heldLeases.get("2").getLeaseCounter());
|
||||||
|
|
||||||
leaseManager.updateLease(lease2); // lose lease 2
|
// lose lease 2
|
||||||
|
leaseManager.takeLease(lease2, "bar");
|
||||||
|
|
||||||
// Do another renewal and make sure the copy doesn't change
|
// Do another renewal and make sure the copy doesn't change
|
||||||
builder.renewMutateAssert(renewer, "1");
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
|
@ -176,7 +179,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");
|
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");
|
||||||
|
|
||||||
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
||||||
leaseManager.renewLease(lease);
|
leaseManager.takeLease(lease, "bar");
|
||||||
builder.renewMutateAssert(renewer);
|
builder.renewMutateAssert(renewer);
|
||||||
|
|
||||||
lease.setCheckpoint(new ExtendedSequenceNumber("new checkpoint"));
|
lease.setCheckpoint(new ExtendedSequenceNumber("new checkpoint"));
|
||||||
|
|
@ -195,7 +198,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");
|
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");
|
||||||
|
|
||||||
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
||||||
leaseManager.renewLease(lease);
|
leaseManager.takeLease(lease, "bar");
|
||||||
builder.renewMutateAssert(renewer);
|
builder.renewMutateAssert(renewer);
|
||||||
|
|
||||||
// regain the lease
|
// regain the lease
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,16 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
*
|
*
|
||||||
* Licensed under the Amazon Software License (the "License").
|
* Licensed under the Amazon Software License (the "License").
|
||||||
* You may not use this file except in compliance with the License.
|
* You may not use this file except in compliance with the License.
|
||||||
* A copy of the License is located at
|
* A copy of the License is located at
|
||||||
*
|
*
|
||||||
* http://aws.amazon.com/asl/
|
* http://aws.amazon.com/asl/
|
||||||
*
|
*
|
||||||
* or in the "license" file accompanying this file. This file is distributed
|
* or in the "license" file accompanying this file. This file is distributed
|
||||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
* express or implied. See the License for the specific language governing
|
* express or implied. See the License for the specific language governing
|
||||||
* permissions and limitations under the License.
|
* permissions and limitations under the License.
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.leases.impl;
|
package com.amazonaws.services.kinesis.leases.impl;
|
||||||
|
|
||||||
|
|
@ -35,6 +35,7 @@ public class TestHarnessBuilder {
|
||||||
|
|
||||||
private Map<String, KinesisClientLease> leases = new HashMap<String, KinesisClientLease>();
|
private Map<String, KinesisClientLease> leases = new HashMap<String, KinesisClientLease>();
|
||||||
private KinesisClientLeaseManager leaseManager;
|
private KinesisClientLeaseManager leaseManager;
|
||||||
|
private Map<String, KinesisClientLease> originalLeases = new HashMap<>();
|
||||||
|
|
||||||
private Callable<Long> timeProvider = new Callable<Long>() {
|
private Callable<Long> timeProvider = new Callable<Long>() {
|
||||||
|
|
||||||
|
|
@ -54,6 +55,15 @@ public class TestHarnessBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestHarnessBuilder withLease(String shardId, String owner) {
|
public TestHarnessBuilder withLease(String shardId, String owner) {
|
||||||
|
KinesisClientLease lease = createLease(shardId, owner);
|
||||||
|
KinesisClientLease originalLease = createLease(shardId, owner);
|
||||||
|
|
||||||
|
leases.put(shardId, lease);
|
||||||
|
originalLeases.put(shardId, originalLease);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private KinesisClientLease createLease(String shardId, String owner) {
|
||||||
KinesisClientLease lease = new KinesisClientLease();
|
KinesisClientLease lease = new KinesisClientLease();
|
||||||
lease.setCheckpoint(new ExtendedSequenceNumber("checkpoint"));
|
lease.setCheckpoint(new ExtendedSequenceNumber("checkpoint"));
|
||||||
lease.setOwnerSwitchesSinceCheckpoint(0L);
|
lease.setOwnerSwitchesSinceCheckpoint(0L);
|
||||||
|
|
@ -62,8 +72,7 @@ public class TestHarnessBuilder {
|
||||||
lease.setParentShardIds(Collections.singleton("parentShardId"));
|
lease.setParentShardIds(Collections.singleton("parentShardId"));
|
||||||
lease.setLeaseKey(shardId);
|
lease.setLeaseKey(shardId);
|
||||||
|
|
||||||
leases.put(shardId, lease);
|
return lease;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, KinesisClientLease> build() throws LeasingException {
|
public Map<String, KinesisClientLease> build() throws LeasingException {
|
||||||
|
|
@ -147,7 +156,7 @@ public class TestHarnessBuilder {
|
||||||
Assert.assertEquals(renewedShardIds.length, heldLeases.size());
|
Assert.assertEquals(renewedShardIds.length, heldLeases.size());
|
||||||
|
|
||||||
for (String shardId : renewedShardIds) {
|
for (String shardId : renewedShardIds) {
|
||||||
KinesisClientLease original = leases.get(shardId);
|
KinesisClientLease original = originalLeases.get(shardId);
|
||||||
Assert.assertNotNull(original);
|
Assert.assertNotNull(original);
|
||||||
|
|
||||||
KinesisClientLease actual = heldLeases.get(shardId);
|
KinesisClientLease actual = heldLeases.get(shardId);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue