Set expectation criteria to use leaseKey (#88)
* Set expectation criteria to use leaseKey * Set exists to true * Add integration tests for leaseMetaInfo update
This commit is contained in:
parent
ca661bf30d
commit
27494ea043
4 changed files with 78 additions and 24 deletions
|
|
@ -16,8 +16,6 @@ package software.amazon.kinesis.leases;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
|
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
|
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
|
||||||
|
|
@ -82,9 +80,10 @@ public interface LeaseSerializer {
|
||||||
Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation();
|
Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @param leaseKey
|
||||||
* @return the attribute value map asserting that a lease does exist.
|
* @return the attribute value map asserting that a lease does exist.
|
||||||
*/
|
*/
|
||||||
default Map<String, ExpectedAttributeValue> getDynamoExistantExpectation() {
|
default Map<String, ExpectedAttributeValue> getDynamoExistentExpectation(String leaseKey) {
|
||||||
throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented");
|
throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,8 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.leases.dynamodb;
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -21,12 +23,30 @@ import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.*;
|
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.LimitExceededException;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
|
||||||
import software.amazon.awssdk.utils.CollectionUtils;
|
import software.amazon.awssdk.utils.CollectionUtils;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.common.FutureUtils;
|
import software.amazon.kinesis.common.FutureUtils;
|
||||||
|
|
@ -41,7 +61,6 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||||
import software.amazon.kinesis.retrieval.AWSExceptionManager;
|
import software.amazon.kinesis.retrieval.AWSExceptionManager;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of {@link LeaseRefresher} that uses DynamoDB.
|
* An implementation of {@link LeaseRefresher} that uses DynamoDB.
|
||||||
|
|
@ -684,7 +703,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
exceptionManager.add(ConditionalCheckFailedException.class, t -> t);
|
exceptionManager.add(ConditionalCheckFailedException.class, t -> t);
|
||||||
Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
|
Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
|
||||||
UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease))
|
UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease))
|
||||||
.expected(serializer.getDynamoExistantExpectation())
|
.expected(serializer.getDynamoExistentExpectation(lease.leaseKey()))
|
||||||
.attributeUpdates(updates).build();
|
.attributeUpdates(updates).build();
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
|
|
@ -193,10 +193,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, ExpectedAttributeValue> getDynamoExistantExpectation() {
|
public Map<String, ExpectedAttributeValue> getDynamoExistentExpectation(String leaseKey) {
|
||||||
Map<String, ExpectedAttributeValue> result = new HashMap<>();
|
Map<String, ExpectedAttributeValue> result = new HashMap<>();
|
||||||
|
|
||||||
ExpectedAttributeValue expectedAV = ExpectedAttributeValue.builder().exists(true).build();
|
ExpectedAttributeValue expectedAV = ExpectedAttributeValue.builder()
|
||||||
|
.exists(true)
|
||||||
|
.value(DynamoUtils.createAttributeValue(leaseKey))
|
||||||
|
.build();
|
||||||
result.put(LEASE_KEY_KEY, expectedAV);
|
result.put(LEASE_KEY_KEY, expectedAV);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,21 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.leases.dynamodb;
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
|
||||||
|
import software.amazon.kinesis.common.HashKeyRangeForLease;
|
||||||
|
import software.amazon.kinesis.leases.Lease;
|
||||||
|
import software.amazon.kinesis.leases.LeaseIntegrationTest;
|
||||||
|
import software.amazon.kinesis.leases.UpdateField;
|
||||||
|
import software.amazon.kinesis.leases.exceptions.LeasingException;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
@ -24,20 +39,6 @@ import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.mockito.runners.MockitoJUnitRunner;
|
|
||||||
|
|
||||||
import software.amazon.kinesis.leases.Lease;
|
|
||||||
import software.amazon.kinesis.leases.LeaseIntegrationTest;
|
|
||||||
import software.amazon.kinesis.leases.exceptions.LeasingException;
|
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest {
|
public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest {
|
||||||
|
|
||||||
|
|
@ -102,6 +103,38 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
|
||||||
assertNull(actual);
|
assertNull(actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests leaseRefresher.updateLeaseWithMetaInfo() when the lease is deleted before updating it with meta info
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDeleteLeaseThenUpdateLeaseWithMetaInfo() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
Lease lease = builder.withLease("1").build().get("1");
|
||||||
|
final String leaseKey = lease.leaseKey();
|
||||||
|
leaseRefresher.deleteLease(lease);
|
||||||
|
leaseRefresher.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
|
||||||
|
final Lease deletedLease = leaseRefresher.getLease(leaseKey);
|
||||||
|
Assert.assertNull(deletedLease);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests leaseRefresher.updateLeaseWithMetaInfo() on hashKeyRange update
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUpdateLeaseWithMetaInfo() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
Lease lease = builder.withLease("1").build().get("1");
|
||||||
|
final String leaseKey = lease.leaseKey();
|
||||||
|
final HashKeyRangeForLease hashKeyRangeForLease = HashKeyRangeForLease.fromHashKeyRange(HashKeyRange.builder()
|
||||||
|
.startingHashKey("1")
|
||||||
|
.endingHashKey("2")
|
||||||
|
.build());
|
||||||
|
lease.hashKeyRange(hashKeyRangeForLease);
|
||||||
|
leaseRefresher.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
|
||||||
|
final Lease updatedLease = leaseRefresher.getLease(leaseKey);
|
||||||
|
Assert.assertEquals(lease, updatedLease);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests leaseRefresher.holdLease's success scenario.
|
* Tests leaseRefresher.holdLease's success scenario.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue