From 27494ea0438b9f8efb59f3ba98a5c35ae14b7dbb Mon Sep 17 00:00:00 2001 From: Renju Radhakrishnan Date: Tue, 28 Jul 2020 15:18:40 -0700 Subject: [PATCH] Set expectation criteria to use leaseKey (#88) * Set expectation criteria to use leaseKey * Set exists to true * Add integration tests for leaseMetaInfo update --- .../kinesis/leases/LeaseSerializer.java | 5 +- .../dynamodb/DynamoDBLeaseRefresher.java | 29 +++++++-- .../dynamodb/DynamoDBLeaseSerializer.java | 7 ++- ...DynamoDBLeaseRefresherIntegrationTest.java | 61 ++++++++++++++----- 4 files changed, 78 insertions(+), 24 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java index ff72891a..f36f5a66 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java @@ -16,8 +16,6 @@ package software.amazon.kinesis.leases; import java.util.Collection; import java.util.Map; - - import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; @@ -82,9 +80,10 @@ public interface LeaseSerializer { Map getDynamoNonexistantExpectation(); /** + * @param leaseKey * @return the attribute value map asserting that a lease does exist. */ - default Map getDynamoExistantExpectation() { + default Map getDynamoExistentExpectation(String leaseKey) { throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented"); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 98f75144..fb39e80f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -14,6 +14,8 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.collect.ImmutableMap; + import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -21,12 +23,30 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - -import com.google.common.collect.ImmutableMap; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.awssdk.services.dynamodb.model.*; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; +import software.amazon.awssdk.services.dynamodb.model.TableStatus; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; @@ -41,7 +61,6 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.awssdk.services.dynamodb.model.BillingMode; /** * An implementation of {@link LeaseRefresher} that uses DynamoDB. @@ -684,7 +703,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { exceptionManager.add(ConditionalCheckFailedException.class, t -> t); Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease)) - .expected(serializer.getDynamoExistantExpectation()) + .expected(serializer.getDynamoExistentExpectation(lease.leaseKey())) .attributeUpdates(updates).build(); try { try { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 832118dc..64a7840c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -193,10 +193,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { } @Override - public Map getDynamoExistantExpectation() { + public Map getDynamoExistentExpectation(String leaseKey) { Map result = new HashMap<>(); - ExpectedAttributeValue expectedAV = ExpectedAttributeValue.builder().exists(true).build(); + ExpectedAttributeValue expectedAV = ExpectedAttributeValue.builder() + .exists(true) + .value(DynamoUtils.createAttributeValue(leaseKey)) + .build(); result.put(LEASE_KEY_KEY, expectedAV); return result; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java index 75431866..1b2fa78a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java @@ -14,6 +14,21 @@ */ package software.amazon.kinesis.leases.dynamodb; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.kinesis.model.HashKeyRange; +import software.amazon.kinesis.common.HashKeyRangeForLease; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseIntegrationTest; +import software.amazon.kinesis.leases.UpdateField; +import software.amazon.kinesis.leases.exceptions.LeasingException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -24,20 +39,6 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.runners.MockitoJUnitRunner; - -import software.amazon.kinesis.leases.Lease; -import software.amazon.kinesis.leases.LeaseIntegrationTest; -import software.amazon.kinesis.leases.exceptions.LeasingException; - @RunWith(MockitoJUnitRunner.class) public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest { @@ -102,6 +103,38 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest assertNull(actual); } + /** + * Tests leaseRefresher.updateLeaseWithMetaInfo() when the lease is deleted before updating it with meta info + */ + @Test + public void testDeleteLeaseThenUpdateLeaseWithMetaInfo() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + Lease lease = builder.withLease("1").build().get("1"); + final String leaseKey = lease.leaseKey(); + leaseRefresher.deleteLease(lease); + leaseRefresher.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); + final Lease deletedLease = leaseRefresher.getLease(leaseKey); + Assert.assertNull(deletedLease); + } + + /** + * Tests leaseRefresher.updateLeaseWithMetaInfo() on hashKeyRange update + */ + @Test + public void testUpdateLeaseWithMetaInfo() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + Lease lease = builder.withLease("1").build().get("1"); + final String leaseKey = lease.leaseKey(); + final HashKeyRangeForLease hashKeyRangeForLease = HashKeyRangeForLease.fromHashKeyRange(HashKeyRange.builder() + .startingHashKey("1") + .endingHashKey("2") + .build()); + lease.hashKeyRange(hashKeyRangeForLease); + leaseRefresher.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); + final Lease updatedLease = leaseRefresher.getLease(leaseKey); + Assert.assertEquals(lease, updatedLease); + } + /** * Tests leaseRefresher.holdLease's success scenario. */