Renaming interfaces and removing I. Introducing DynamoDBCheckpointer.

This commit is contained in:
Sahil Palvia 2018-04-19 14:23:40 -07:00
parent d4a36f2b28
commit 96d2254541
46 changed files with 1232 additions and 1268 deletions

View file

@ -20,11 +20,6 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import lombok.Data; import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLeaseManager;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.metrics.IMetricsFactory; import software.amazon.kinesis.metrics.IMetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -55,8 +50,6 @@ public class CheckpointConfig {
private long failoverTimeMillis = 10000L; private long failoverTimeMillis = 10000L;
private ILeaseManager<KinesisClientLease> leaseManager;
private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesForWorker = Integer.MAX_VALUE;
private int maxLeasesToStealAtOneTime = 1; private int maxLeasesToStealAtOneTime = 1;
@ -69,33 +62,10 @@ public class CheckpointConfig {
private long epsilonMillis = 25L; private long epsilonMillis = 25L;
private LeaseCoordinator<KinesisClientLease> leaseCoordinator;
public ILeaseManager<KinesisClientLease> leaseManager() {
if (leaseManager == null) {
leaseManager = new KinesisClientLeaseManager(tableName, amazonDynamoDB, consistentReads);
}
return leaseManager;
}
public CheckpointFactory checkpointFactory() { public CheckpointFactory checkpointFactory() {
if (checkpointFactory == null) { if (checkpointFactory == null) {
checkpointFactory = new DynamoDBCheckpointFactory(leaseCoordinator(), leaseManager(), metricsFactory()); checkpointFactory = new DynamoDBCheckpointFactory(metricsFactory());
} }
return checkpointFactory; return checkpointFactory;
} }
public LeaseCoordinator<KinesisClientLease> leaseCoordinator() {
if (leaseCoordinator == null) {
leaseCoordinator = new KinesisClientLibLeaseCoordinator(leaseManager(),
workerIdentifier(),
failoverTimeMillis(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
metricsFactory());
}
return leaseCoordinator;
}
} }

View file

@ -15,9 +15,8 @@
package software.amazon.kinesis.checkpoint; package software.amazon.kinesis.checkpoint;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
@ -26,5 +25,5 @@ import software.amazon.kinesis.processor.Checkpointer;
*/ */
public interface CheckpointFactory { public interface CheckpointFactory {
Checkpointer createCheckpointer(LeaseCoordinator<KinesisClientLease> leaseCoordinator, Checkpointer createCheckpointer(LeaseCoordinator<KinesisClientLease> leaseCoordinator,
ILeaseManager<KinesisClientLease> leaseManager); LeaseManager<KinesisClientLease> leaseManager);
} }

View file

@ -17,7 +17,7 @@ package software.amazon.kinesis.checkpoint;
import lombok.Data; import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.metrics.IMetricsFactory; import software.amazon.kinesis.metrics.IMetricsFactory;
@ -28,15 +28,12 @@ import software.amazon.kinesis.processor.Checkpointer;
*/ */
@Data @Data
public class DynamoDBCheckpointFactory implements CheckpointFactory { public class DynamoDBCheckpointFactory implements CheckpointFactory {
@NonNull
private final LeaseCoordinator<KinesisClientLease> leaseLeaseCoordinator;
@NonNull
private final ILeaseManager<KinesisClientLease> leaseManager;
@NonNull @NonNull
private final IMetricsFactory metricsFactory; private final IMetricsFactory metricsFactory;
@Override @Override
public Checkpointer createCheckpoint() { public Checkpointer createCheckpointer(final LeaseCoordinator<KinesisClientLease> leaseLeaseCoordinator,
final LeaseManager<KinesisClientLease> leaseManager) {
return new DynamoDBCheckpointer(leaseLeaseCoordinator, leaseManager, metricsFactory); return new DynamoDBCheckpointer(leaseLeaseCoordinator, leaseManager, metricsFactory);
} }

View file

@ -28,7 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
@ -47,7 +47,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
@NonNull @NonNull
private final LeaseCoordinator<KinesisClientLease> leaseCoordinator; private final LeaseCoordinator<KinesisClientLease> leaseCoordinator;
@NonNull @NonNull
private final ILeaseManager<KinesisClientLease> leaseManager; private final LeaseManager<KinesisClientLease> leaseManager;
@NonNull @NonNull
private final IMetricsFactory metricsFactory; private final IMetricsFactory metricsFactory;

View file

@ -38,7 +38,7 @@ import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -108,7 +108,7 @@ public class Scheduler implements Runnable {
private final String streamName; private final String streamName;
private final long listShardsBackoffTimeMillis; private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts; private final int maxListShardsRetryAttempts;
private final ILeaseManager<KinesisClientLease> leaseManager; private final LeaseManager<KinesisClientLease> leaseManager;
private final LeaseManagerProxy leaseManagerProxy; private final LeaseManagerProxy leaseManagerProxy;
private final boolean ignoreUnexpetedChildShards; private final boolean ignoreUnexpetedChildShards;

View file

@ -76,8 +76,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
} }
@Override @Override
public LeaseManager<KinesisClientLease> createLeaseManager() { public DynamoDBLeaseManager<KinesisClientLease> createLeaseManager() {
return new KinesisClientLeaseManager(tableName, amazonDynamoDB, consistentReads); return new KinesisClientDynamoDBLeaseManager(tableName, amazonDynamoDB, consistentReads);
} }
@Override @Override

View file

@ -0,0 +1,593 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.GetItemResult;
import com.amazonaws.services.dynamodbv2.model.LimitExceededException;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazonaws.services.dynamodbv2.model.TableStatus;
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import lombok.extern.slf4j.Slf4j;
/**
* An implementation of ILeaseManager that uses DynamoDB.
*/
@Slf4j
public class DynamoDBLeaseManager<T extends Lease> implements LeaseManager<T> {
protected String table;
protected AmazonDynamoDB dynamoDBClient;
protected LeaseSerializer<T> serializer;
protected boolean consistentReads;
/**
* Constructor.
*
* @param table leases table
* @param dynamoDBClient DynamoDB client to use
* @param serializer LeaseSerializer to use to convert to/from DynamoDB objects.
*/
public DynamoDBLeaseManager(String table, AmazonDynamoDB dynamoDBClient, LeaseSerializer<T> serializer) {
this(table, dynamoDBClient, serializer, false);
}
/**
* Constructor for test cases - allows control of consistent reads. Consistent reads should only be used for testing
* - our code is meant to be resilient to inconsistent reads. Using consistent reads during testing speeds up
* execution of simple tests (you don't have to wait out the consistency window). Test cases that want to experience
* eventual consistency should not set consistentReads=true.
*
* @param table leases table
* @param dynamoDBClient DynamoDB client to use
* @param serializer lease serializer to use
* @param consistentReads true if we want consistent reads for testing purposes.
*/
public DynamoDBLeaseManager(String table, AmazonDynamoDB dynamoDBClient, LeaseSerializer<T> serializer, boolean consistentReads) {
verifyNotNull(table, "Table name cannot be null");
verifyNotNull(dynamoDBClient, "dynamoDBClient cannot be null");
verifyNotNull(serializer, "ILeaseSerializer cannot be null");
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.consistentReads = consistentReads;
this.serializer = serializer;
}
/**
* {@inheritDoc}
*/
@Override
public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
throws ProvisionedThroughputException, DependencyException {
verifyNotNull(readCapacity, "readCapacity cannot be null");
verifyNotNull(writeCapacity, "writeCapacity cannot be null");
try {
if (tableStatus() != null) {
return false;
}
} catch (DependencyException de) {
//
// Something went wrong with DynamoDB
//
log.error("Failed to get table status for {}", table, de);
}
CreateTableRequest request = new CreateTableRequest();
request.setTableName(table);
request.setKeySchema(serializer.getKeySchema());
request.setAttributeDefinitions(serializer.getAttributeDefinitions());
ProvisionedThroughput throughput = new ProvisionedThroughput();
throughput.setReadCapacityUnits(readCapacity);
throughput.setWriteCapacityUnits(writeCapacity);
request.setProvisionedThroughput(throughput);
try {
dynamoDBClient.createTable(request);
} catch (ResourceInUseException e) {
log.info("Table {} already exists.", table);
return false;
} catch (LimitExceededException e) {
throw new ProvisionedThroughputException("Capacity exceeded when creating table " + table, e);
} catch (AmazonClientException e) {
throw new DependencyException(e);
}
return true;
}
/**
* {@inheritDoc}
*/
@Override
public boolean leaseTableExists() throws DependencyException {
return TableStatus.ACTIVE == tableStatus();
}
private TableStatus tableStatus() throws DependencyException {
DescribeTableRequest request = new DescribeTableRequest();
request.setTableName(table);
DescribeTableResult result;
try {
result = dynamoDBClient.describeTable(request);
} catch (ResourceNotFoundException e) {
if (log.isDebugEnabled()) {
log.debug("Got ResourceNotFoundException for table {} in leaseTableExists, returning false.", table);
}
return null;
} catch (AmazonClientException e) {
throw new DependencyException(e);
}
TableStatus tableStatus = TableStatus.fromValue(result.getTable().getTableStatus());
if (log.isDebugEnabled()) {
log.debug("Lease table exists and is in status {}", tableStatus);
}
return tableStatus;
}
@Override
public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException {
long sleepTimeRemaining = TimeUnit.SECONDS.toMillis(timeoutSeconds);
while (!leaseTableExists()) {
if (sleepTimeRemaining <= 0) {
return false;
}
long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining);
sleepTimeRemaining -= sleep(timeToSleepMillis);
}
return true;
}
/**
* Exposed for testing purposes.
*
* @param timeToSleepMillis time to sleep in milliseconds
*
* @return actual time slept in millis
*/
long sleep(long timeToSleepMillis) {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(timeToSleepMillis);
} catch (InterruptedException e) {
log.debug("Interrupted while sleeping");
}
return System.currentTimeMillis() - startTime;
}
/**
* {@inheritDoc}
*/
@Override
public List<T> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(null);
}
/**
* {@inheritDoc}
*/
@Override
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(1).isEmpty();
}
/**
* List with the given page size. Package access for integration testing.
*
* @param limit number of items to consider at a time - used by integration tests to force paging.
* @return list of leases
* @throws InvalidStateException if table does not exist
* @throws DependencyException if DynamoDB scan fail in an unexpected way
* @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
*/
List<T> list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
if (log.isDebugEnabled()) {
log.debug("Listing leases from table {}", table);
}
ScanRequest scanRequest = new ScanRequest();
scanRequest.setTableName(table);
if (limit != null) {
scanRequest.setLimit(limit);
}
try {
ScanResult scanResult = dynamoDBClient.scan(scanRequest);
List<T> result = new ArrayList<T>();
while (scanResult != null) {
for (Map<String, AttributeValue> item : scanResult.getItems()) {
if (log.isDebugEnabled()) {
log.debug("Got item {} from DynamoDB.", item.toString());
}
result.add(serializer.fromDynamoRecord(item));
}
Map<String, AttributeValue> lastEvaluatedKey = scanResult.getLastEvaluatedKey();
if (lastEvaluatedKey == null) {
// Signify that we're done.
scanResult = null;
if (log.isDebugEnabled()) {
log.debug("lastEvaluatedKey was null - scan finished.");
}
} else {
// Make another request, picking up where we left off.
scanRequest.setExclusiveStartKey(lastEvaluatedKey);
if (log.isDebugEnabled()) {
log.debug("lastEvaluatedKey was {}, continuing scan.", lastEvaluatedKey);
}
scanResult = dynamoDBClient.scan(scanRequest);
}
}
if (log.isDebugEnabled()) {
log.debug("Listed {} leases from table {}", result.size(), table);
}
return result;
} catch (ResourceNotFoundException e) {
throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e);
} catch (ProvisionedThroughputExceededException e) {
throw new ProvisionedThroughputException(e);
} catch (AmazonClientException e) {
throw new DependencyException(e);
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean createLeaseIfNotExists(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) {
log.debug("Creating lease {}", lease);
}
PutItemRequest request = new PutItemRequest();
request.setTableName(table);
request.setItem(serializer.toDynamoRecord(lease));
request.setExpected(serializer.getDynamoNonexistantExpectation());
try {
dynamoDBClient.putItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Did not create lease {} because it already existed", lease);
}
return false;
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("create", lease.getLeaseKey(), e);
}
return true;
}
/**
* {@inheritDoc}
*/
@Override
public T getLease(String leaseKey)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(leaseKey, "leaseKey cannot be null");
if (log.isDebugEnabled()) {
log.debug("Getting lease with key {}", leaseKey);
}
GetItemRequest request = new GetItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(leaseKey));
request.setConsistentRead(consistentReads);
try {
GetItemResult result = dynamoDBClient.getItem(request);
Map<String, AttributeValue> dynamoRecord = result.getItem();
if (dynamoRecord == null) {
if (log.isDebugEnabled()) {
log.debug("No lease found with key {}, returning null.", leaseKey);
}
return null;
} else {
T lease = serializer.fromDynamoRecord(dynamoRecord);
if (log.isDebugEnabled()) {
log.debug("Got lease {}", lease);
}
return lease;
}
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("get", leaseKey, e);
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean renewLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) {
log.debug("Renewing lease with key {}", lease.getLeaseKey());
}
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease));
request.setAttributeUpdates(serializer.getDynamoLeaseCounterUpdate(lease));
try {
dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}",
lease.getLeaseKey(), lease.getLeaseCounter());
}
// If we had a spurious retry during the Dynamo update, then this conditional PUT failure
// might be incorrect. So, we get the item straight away and check if the lease owner + lease counter
// are what we expected.
String expectedOwner = lease.getLeaseOwner();
Long expectedCounter = lease.getLeaseCounter() + 1;
T updatedLease = getLease(lease.getLeaseKey());
if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) ||
!expectedCounter.equals(updatedLease.getLeaseCounter())) {
return false;
}
log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.getLeaseKey());
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e);
}
lease.setLeaseCounter(lease.getLeaseCounter() + 1);
return true;
}
/**
* {@inheritDoc}
*/
@Override
public boolean takeLease(T lease, String owner)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
verifyNotNull(owner, "owner cannot be null");
if (log.isDebugEnabled()) {
log.debug("Taking lease with leaseKey {} from {} to {}",
lease.getLeaseKey(),
lease.getLeaseOwner() == null ? "nobody" : lease.getLeaseOwner(),
owner);
}
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoLeaseCounterUpdate(lease);
updates.putAll(serializer.getDynamoTakeLeaseUpdate(lease, owner));
request.setAttributeUpdates(updates);
try {
dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}",
lease.getLeaseKey(), lease.getLeaseCounter());
}
return false;
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("take", lease.getLeaseKey(), e);
}
lease.setLeaseCounter(lease.getLeaseCounter() + 1);
lease.setLeaseOwner(owner);
return true;
}
/**
* {@inheritDoc}
*/
@Override
public boolean evictLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) {
log.debug("Evicting lease with leaseKey {} owned by {}",
lease.getLeaseKey(),
lease.getLeaseOwner());
}
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseOwnerExpectation(lease));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoLeaseCounterUpdate(lease);
updates.putAll(serializer.getDynamoEvictLeaseUpdate(lease));
request.setAttributeUpdates(updates);
try {
dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}",
lease.getLeaseKey(), lease.getLeaseOwner());
}
return false;
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("evict", lease.getLeaseKey(), e);
}
lease.setLeaseOwner(null);
lease.setLeaseCounter(lease.getLeaseCounter() + 1);
return true;
}
/**
* {@inheritDoc}
*/
public void deleteAll() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
List<T> allLeases = listLeases();
log.warn("Deleting {} items from table {}", allLeases.size(), table);
for (T lease : allLeases) {
DeleteItemRequest deleteRequest = new DeleteItemRequest();
deleteRequest.setTableName(table);
deleteRequest.setKey(serializer.getDynamoHashKey(lease));
dynamoDBClient.deleteItem(deleteRequest);
}
}
/**
* {@inheritDoc}
*/
@Override
public void deleteLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) {
log.debug("Deleting lease with leaseKey {}", lease.getLeaseKey());
}
DeleteItemRequest deleteRequest = new DeleteItemRequest();
deleteRequest.setTableName(table);
deleteRequest.setKey(serializer.getDynamoHashKey(lease));
try {
dynamoDBClient.deleteItem(deleteRequest);
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("delete", lease.getLeaseKey(), e);
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean updateLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) {
log.debug("Updating lease {}", lease);
}
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoLeaseCounterUpdate(lease);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));
request.setAttributeUpdates(updates);
try {
dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Lease update failed for lease with key {} because the lease counter was not {}",
lease.getLeaseKey(), lease.getLeaseCounter());
}
return false;
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e);
}
lease.setLeaseCounter(lease.getLeaseCounter() + 1);
return true;
}
/*
* This method contains boilerplate exception handling - it throws or returns something to be thrown. The
* inconsistency there exists to satisfy the compiler when this method is used at the end of non-void methods.
*/
protected DependencyException convertAndRethrowExceptions(String operation, String leaseKey, AmazonClientException e)
throws ProvisionedThroughputException, InvalidStateException {
if (e instanceof ProvisionedThroughputExceededException) {
log.warn("Provisioned Throughput on the lease table has been exceeded. It's recommended that you increase the IOPs on the table. Failure to increase the IOPs may cause the application to not make progress.");
throw new ProvisionedThroughputException(e);
} else if (e instanceof ResourceNotFoundException) {
// @formatter:on
throw new InvalidStateException(String.format("Cannot %s lease with key %s because table %s does not exist.",
operation,
leaseKey,
table),
e);
//@formatter:off
} else {
return new DependencyException(e);
}
}
private void verifyNotNull(Object object, String message) {
if (object == null) {
throw new IllegalArgumentException(message);
}
}
}

View file

@ -44,10 +44,10 @@ import lombok.extern.slf4j.Slf4j;
* An implementation of ILeaseRenewer that uses DynamoDB via LeaseManager. * An implementation of ILeaseRenewer that uses DynamoDB via LeaseManager.
*/ */
@Slf4j @Slf4j
public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> { public class DynamoDBLeaseRenewer<T extends Lease> implements LeaseRenewer<T> {
private static final int RENEWAL_RETRIES = 2; private static final int RENEWAL_RETRIES = 2;
private final ILeaseManager<T> leaseManager; private final LeaseManager<T> leaseManager;
private final ConcurrentNavigableMap<String, T> ownedLeases = new ConcurrentSkipListMap<String, T>(); private final ConcurrentNavigableMap<String, T> ownedLeases = new ConcurrentSkipListMap<String, T>();
private final String workerIdentifier; private final String workerIdentifier;
private final long leaseDurationNanos; private final long leaseDurationNanos;
@ -61,8 +61,8 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
* @param leaseDurationMillis duration of a lease in milliseconds * @param leaseDurationMillis duration of a lease in milliseconds
* @param executorService ExecutorService to use for renewing leases in parallel * @param executorService ExecutorService to use for renewing leases in parallel
*/ */
public LeaseRenewer(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis, public DynamoDBLeaseRenewer(LeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis,
ExecutorService executorService) { ExecutorService executorService) {
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.workerIdentifier = workerIdentifier; this.workerIdentifier = workerIdentifier;
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);

View file

@ -0,0 +1,194 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
/**
* An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that
* LeaseSerializer can be decorated by other classes if you need to add fields to leases.
*/
public class DynamoDBLeaseSerializer implements LeaseSerializer<Lease> {
public final String LEASE_KEY_KEY = "leaseKey";
public final String LEASE_OWNER_KEY = "leaseOwner";
public final String LEASE_COUNTER_KEY = "leaseCounter";
public final Class<? extends Lease> clazz;
public DynamoDBLeaseSerializer() {
this.clazz = Lease.class;
}
public DynamoDBLeaseSerializer(Class<? extends Lease> clazz) {
this.clazz = clazz;
}
@Override
public Map<String, AttributeValue> toDynamoRecord(Lease lease) {
Map<String, AttributeValue> result = new HashMap<String, AttributeValue>();
result.put(LEASE_KEY_KEY, DynamoUtils.createAttributeValue(lease.getLeaseKey()));
result.put(LEASE_COUNTER_KEY, DynamoUtils.createAttributeValue(lease.getLeaseCounter()));
if (lease.getLeaseOwner() != null) {
result.put(LEASE_OWNER_KEY, DynamoUtils.createAttributeValue(lease.getLeaseOwner()));
}
return result;
}
@Override
public Lease fromDynamoRecord(Map<String, AttributeValue> dynamoRecord) {
Lease result;
try {
result = clazz.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
result.setLeaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY));
result.setLeaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY));
result.setLeaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY));
return result;
}
@Override
public Map<String, AttributeValue> getDynamoHashKey(String leaseKey) {
Map<String, AttributeValue> result = new HashMap<String, AttributeValue>();
result.put(LEASE_KEY_KEY, DynamoUtils.createAttributeValue(leaseKey));
return result;
}
@Override
public Map<String, AttributeValue> getDynamoHashKey(Lease lease) {
return getDynamoHashKey(lease.getLeaseKey());
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoLeaseCounterExpectation(Lease lease) {
return getDynamoLeaseCounterExpectation(lease.getLeaseCounter());
}
public Map<String, ExpectedAttributeValue> getDynamoLeaseCounterExpectation(Long leaseCounter) {
Map<String, ExpectedAttributeValue> result = new HashMap<String, ExpectedAttributeValue>();
ExpectedAttributeValue eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(leaseCounter));
result.put(LEASE_COUNTER_KEY, eav);
return result;
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoLeaseOwnerExpectation(Lease lease) {
Map<String, ExpectedAttributeValue> result = new HashMap<String, ExpectedAttributeValue>();
ExpectedAttributeValue eav = null;
if (lease.getLeaseOwner() == null) {
eav = new ExpectedAttributeValue(false);
} else {
eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner()));
}
result.put(LEASE_OWNER_KEY, eav);
return result;
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation() {
Map<String, ExpectedAttributeValue> result = new HashMap<String, ExpectedAttributeValue>();
ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(false);
result.put(LEASE_KEY_KEY, expectedAV);
return result;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(Lease lease) {
return getDynamoLeaseCounterUpdate(lease.getLeaseCounter());
}
public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(Long leaseCounter) {
Map<String, AttributeValueUpdate> result = new HashMap<String, AttributeValueUpdate>();
AttributeValueUpdate avu =
new AttributeValueUpdate(DynamoUtils.createAttributeValue(leaseCounter + 1), AttributeAction.PUT);
result.put(LEASE_COUNTER_KEY, avu);
return result;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoTakeLeaseUpdate(Lease lease, String owner) {
Map<String, AttributeValueUpdate> result = new HashMap<String, AttributeValueUpdate>();
result.put(LEASE_OWNER_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(owner),
AttributeAction.PUT));
return result;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoEvictLeaseUpdate(Lease lease) {
Map<String, AttributeValueUpdate> result = new HashMap<String, AttributeValueUpdate>();
result.put(LEASE_OWNER_KEY, new AttributeValueUpdate(null, AttributeAction.DELETE));
return result;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease) {
// There is no application-specific data in Lease - just return a map that increments the counter.
return new HashMap<String, AttributeValueUpdate>();
}
@Override
public Collection<KeySchemaElement> getKeySchema() {
List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName(LEASE_KEY_KEY).withKeyType(KeyType.HASH));
return keySchema;
}
@Override
public Collection<AttributeDefinition> getAttributeDefinitions() {
List<AttributeDefinition> definitions = new ArrayList<AttributeDefinition>();
definitions.add(new AttributeDefinition().withAttributeName(LEASE_KEY_KEY)
.withAttributeType(ScalarAttributeType.S));
return definitions;
}
}

View file

@ -53,7 +53,7 @@ public class DynamoDBLeaseTaker<T extends Lease> implements LeaseTaker<T> {
} }
}; };
private final ILeaseManager<T> leaseManager; private final LeaseManager<T> leaseManager;
private final String workerIdentifier; private final String workerIdentifier;
private final Map<String, T> allLeases = new HashMap<String, T>(); private final Map<String, T> allLeases = new HashMap<String, T>();
private final long leaseDurationNanos; private final long leaseDurationNanos;
@ -62,7 +62,7 @@ public class DynamoDBLeaseTaker<T extends Lease> implements LeaseTaker<T> {
private long lastScanTimeNanos = 0L; private long lastScanTimeNanos = 0L;
public DynamoDBLeaseTaker(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) { public DynamoDBLeaseTaker(LeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.workerIdentifier = workerIdentifier; this.workerIdentifier = workerIdentifier;
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);

View file

@ -1,41 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
/**
* A decoration of ILeaseManager that adds methods to get/update checkpoints.
*/
public interface IKinesisClientLeaseManager extends ILeaseManager<KinesisClientLease> {
/**
* Gets the current checkpoint of the shard. This is useful in the resharding use case
* where we will wait for the parent shard to complete before starting on the records from a child shard.
*
* @param shardId Checkpoint of this shard will be returned
* @return Checkpoint of this shard, or null if the shard record doesn't exist.
*
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws InvalidStateException if lease table does not exist
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
public abstract ExtendedSequenceNumber getCheckpoint(String shardId)
throws ProvisionedThroughputException, InvalidStateException, DependencyException;
}

View file

@ -1,194 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases;
import java.util.List;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.leases.Lease;
/**
* Supports basic CRUD operations for Leases.
*
* @param <T> Lease subclass, possibly Lease itself.
*/
public interface ILeaseManager<T extends Lease> {
/**
* Creates the table that will store leases. Succeeds if table already exists.
*
* @param readCapacity
* @param writeCapacity
*
* @return true if we created a new table (table didn't exist before)
*
* @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity
* restrictions.
* @throws DependencyException if DynamoDB createTable fails in an unexpected way
*/
public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
throws ProvisionedThroughputException, DependencyException;
/**
* @return true if the lease table already exists.
*
* @throws DependencyException if DynamoDB describeTable fails in an unexpected way
*/
public boolean leaseTableExists() throws DependencyException;
/**
* Blocks until the lease table exists by polling leaseTableExists.
*
* @param secondsBetweenPolls time to wait between polls in seconds
* @param timeoutSeconds total time to wait in seconds
*
* @return true if table exists, false if timeout was reached
*
* @throws DependencyException if DynamoDB describeTable fails in an unexpected way
*/
public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException;
/**
* List all objects in table synchronously.
*
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
*
* @return list of leases
*/
public List<T> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Create a new lease. Conditional on a lease not already existing with this shardId.
*
* @param lease the lease to create
*
* @return true if lease was created, false if lease already exists
*
* @throws DependencyException if DynamoDB put fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB put fails due to lack of capacity
*/
public boolean createLeaseIfNotExists(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* @param shardId Get the lease for this shardId
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity
* @throws DependencyException if DynamoDB get fails in an unexpected way
*
* @return lease for the specified shardId, or null if one doesn't exist
*/
public T getLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter
* of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB.
*
* @param lease the lease to renew
*
* @return true if renewal succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
public boolean renewLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on
* the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the
* passed-in lease object after updating DynamoDB.
*
* @param lease the lease to take
* @param owner the new owner
*
* @return true if lease was successfully taken, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
public boolean takeLease(T lease, String owner)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of
* the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB.
*
* @param lease the lease to void
*
* @return true if eviction succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
public boolean evictLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB.
*
* @param lease the lease to delete
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB delete fails due to lack of capacity
* @throws DependencyException if DynamoDB delete fails in an unexpected way
*/
public void deleteLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Delete all leases from DynamoDB. Useful for tools/utils and testing.
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan or delete fail due to lack of capacity
* @throws DependencyException if DynamoDB scan or delete fail in an unexpected way
*/
public void deleteAll() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing
* library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the
* leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other
* updates. Mutates the lease counter of the passed-in lease object.
*
* @return true if update succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
public boolean updateLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Check (synchronously) if there are any leases in the lease table.
*
* @return true if there are no leases in the lease table
*
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
*/
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
}

View file

@ -1,116 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases;
import java.util.Collection;
import java.util.Map;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import software.amazon.kinesis.leases.Lease;
/**
* Utility class that manages the mapping of Lease objects/operations to records in DynamoDB.
*
* @param <T> Lease subclass, possibly Lease itself
*/
public interface ILeaseSerializer<T extends Lease> {
/**
* Construct a DynamoDB record out of a Lease object
*
* @param lease lease object to serialize
* @return an attribute value map representing the lease object
*/
public Map<String, AttributeValue> toDynamoRecord(T lease);
/**
* Construct a Lease object out of a DynamoDB record.
*
* @param dynamoRecord attribute value map from DynamoDB
* @return a deserialized lease object representing the attribute value map
*/
public T fromDynamoRecord(Map<String, AttributeValue> dynamoRecord);
/**
* @param lease
* @return the attribute value map representing a Lease's hash key given a Lease object.
*/
public Map<String, AttributeValue> getDynamoHashKey(T lease);
/**
* Special getDynamoHashKey implementation used by ILeaseManager.getLease().
*
* @param leaseKey
* @return the attribute value map representing a Lease's hash key given a string.
*/
public Map<String, AttributeValue> getDynamoHashKey(String leaseKey);
/**
* @param lease
* @return the attribute value map asserting that a lease counter is what we expect.
*/
public Map<String, ExpectedAttributeValue> getDynamoLeaseCounterExpectation(T lease);
/**
* @param lease
* @return the attribute value map asserting that the lease owner is what we expect.
*/
public Map<String, ExpectedAttributeValue> getDynamoLeaseOwnerExpectation(T lease);
/**
* @return the attribute value map asserting that a lease does not exist.
*/
public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation();
/**
* @param lease
* @return the attribute value map that increments a lease counter
*/
public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(T lease);
/**
* @param lease
* @param newOwner
* @return the attribute value map that takes a lease for a new owner
*/
public Map<String, AttributeValueUpdate> getDynamoTakeLeaseUpdate(T lease, String newOwner);
/**
* @param lease
* @return the attribute value map that voids a lease
*/
public Map<String, AttributeValueUpdate> getDynamoEvictLeaseUpdate(T lease);
/**
* @param lease
* @return the attribute value map that updates application-specific data for a lease and increments the lease
* counter
*/
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(T lease);
/**
* @return the key schema for creating a DynamoDB table to store leases
*/
public Collection<KeySchemaElement> getKeySchema();
/**
* @return attribute definitions for creating a DynamoDB table to store leases
*/
public Collection<AttributeDefinition> getAttributeDefinitions();
}

View file

@ -0,0 +1,79 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
/**
* An implementation of LeaseManager for the KinesisClientLibrary - takeLease updates the ownerSwitchesSinceCheckpoint field.
*/
public class KinesisClientDynamoDBLeaseManager extends DynamoDBLeaseManager<KinesisClientLease> implements KinesisClientLeaseManager {
/**
* Constructor.
*
* @param table Leases table
* @param dynamoDBClient DynamoDB client to use
*/
public KinesisClientDynamoDBLeaseManager(String table, AmazonDynamoDB dynamoDBClient) {
this(table, dynamoDBClient, false);
}
/**
* Constructor for integration tests - see comment on superclass for documentation on setting the consistentReads
* flag.
*
* @param table leases table
* @param dynamoDBClient DynamoDB client to use
* @param consistentReads true if we want consistent reads for testing purposes.
*/
public KinesisClientDynamoDBLeaseManager(String table, AmazonDynamoDB dynamoDBClient, boolean consistentReads) {
super(table, dynamoDBClient, new KinesisClientLeaseSerializer(), consistentReads);
}
/**
* {@inheritDoc}
*/
@Override
public boolean takeLease(KinesisClientLease lease, String newOwner)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
String oldOwner = lease.getLeaseOwner();
boolean result = super.takeLease(lease, newOwner);
if (oldOwner != null && !oldOwner.equals(newOwner)) {
lease.setOwnerSwitchesSinceCheckpoint(lease.getOwnerSwitchesSinceCheckpoint() + 1);
}
return result;
}
/**
* {@inheritDoc}
*/
@Override
public ExtendedSequenceNumber getCheckpoint(String shardId)
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
ExtendedSequenceNumber checkpoint = null;
KinesisClientLease lease = getLease(shardId);
if (lease != null) {
checkpoint = lease.getCheckpoint();
}
return checkpoint;
}
}

View file

@ -14,66 +14,27 @@
*/ */
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/** /**
* An implementation of LeaseManager for the KinesisClientLibrary - takeLease updates the ownerSwitchesSinceCheckpoint field. * A decoration of ILeaseManager that adds methods to get/update checkpoints.
*/ */
public class KinesisClientLeaseManager extends LeaseManager<KinesisClientLease> implements IKinesisClientLeaseManager { public interface KinesisClientLeaseManager extends LeaseManager<KinesisClientLease> {
/** /**
* Constructor. * Gets the current checkpoint of the shard. This is useful in the resharding use case
* where we will wait for the parent shard to complete before starting on the records from a child shard.
* *
* @param table Leases table * @param shardId Checkpoint of this shard will be returned
* @param dynamoDBClient DynamoDB client to use * @return Checkpoint of this shard, or null if the shard record doesn't exist.
*/
public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient) {
this(table, dynamoDBClient, false);
}
/**
* Constructor for integration tests - see comment on superclass for documentation on setting the consistentReads
* flag.
* *
* @param table leases table * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @param dynamoDBClient DynamoDB client to use * @throws InvalidStateException if lease table does not exist
* @param consistentReads true if we want consistent reads for testing purposes. * @throws DependencyException if DynamoDB update fails in an unexpected way
*/ */
public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient, boolean consistentReads) { ExtendedSequenceNumber getCheckpoint(String shardId) throws ProvisionedThroughputException, InvalidStateException, DependencyException;
super(table, dynamoDBClient, new KinesisClientLeaseSerializer(), consistentReads);
}
/**
* {@inheritDoc}
*/
@Override
public boolean takeLease(KinesisClientLease lease, String newOwner)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
String oldOwner = lease.getLeaseOwner();
boolean result = super.takeLease(lease, newOwner);
if (oldOwner != null && !oldOwner.equals(newOwner)) {
lease.setOwnerSwitchesSinceCheckpoint(lease.getOwnerSwitchesSinceCheckpoint() + 1);
}
return result;
}
/**
* {@inheritDoc}
*/
@Override
public ExtendedSequenceNumber getCheckpoint(String shardId)
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
ExtendedSequenceNumber checkpoint = null;
KinesisClientLease lease = getLease(shardId);
if (lease != null) {
checkpoint = lease.getCheckpoint();
}
return checkpoint;
}
} }

View file

@ -29,7 +29,7 @@ import com.google.common.base.Strings;
/** /**
* An implementation of ILeaseSerializer for KinesisClientLease objects. * An implementation of ILeaseSerializer for KinesisClientLease objects.
*/ */
public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisClientLease> { public class KinesisClientLeaseSerializer implements LeaseSerializer<KinesisClientLease> {
private static final String OWNER_SWITCHES_KEY = "ownerSwitchesSinceCheckpoint"; private static final String OWNER_SWITCHES_KEY = "ownerSwitchesSinceCheckpoint";
private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint"; private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint";
@ -38,7 +38,7 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber"; private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
public final String PARENT_SHARD_ID_KEY = "parentShardId"; public final String PARENT_SHARD_ID_KEY = "parentShardId";
private final LeaseSerializer baseSerializer = new LeaseSerializer(KinesisClientLease.class); private final DynamoDBLeaseSerializer baseSerializer = new DynamoDBLeaseSerializer(KinesisClientLease.class);
@Override @Override
public Map<String, AttributeValue> toDynamoRecord(KinesisClientLease lease) { public Map<String, AttributeValue> toDynamoRecord(KinesisClientLease lease) {

View file

@ -20,11 +20,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
@ -42,12 +38,12 @@ public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisCl
@Getter @Getter
@Accessors(fluent = true) @Accessors(fluent = true)
private final ILeaseManager<KinesisClientLease> leaseManager; private final LeaseManager<KinesisClientLease> leaseManager;
private long initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; private long initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
private long initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; private long initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
public KinesisClientLibLeaseCoordinator(final ILeaseManager<KinesisClientLease> leaseManager, public KinesisClientLibLeaseCoordinator(final LeaseManager<KinesisClientLease> leaseManager,
final String workerIdentifier, final String workerIdentifier,
final long leaseDurationMillis, final long leaseDurationMillis,
final long epsilonMillis, final long epsilonMillis,

View file

@ -65,7 +65,7 @@ public class LeaseCoordinator<T extends Lease> {
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNameFormat("LeaseRenewer-%04d").setDaemon(true).build(); .setNameFormat("LeaseRenewer-%04d").setDaemon(true).build();
private final ILeaseRenewer<T> leaseRenewer; private final LeaseRenewer<T> leaseRenewer;
private final LeaseTaker<T> leaseTaker; private final LeaseTaker<T> leaseTaker;
private final long renewerIntervalMillis; private final long renewerIntervalMillis;
private final long takerIntervalMillis; private final long takerIntervalMillis;
@ -87,7 +87,7 @@ public class LeaseCoordinator<T extends Lease> {
* @param leaseDurationMillis Duration of a lease * @param leaseDurationMillis Duration of a lease
* @param epsilonMillis Allow for some variance when calculating lease expirations * @param epsilonMillis Allow for some variance when calculating lease expirations
*/ */
public LeaseCoordinator(ILeaseManager<T> leaseManager, public LeaseCoordinator(LeaseManager<T> leaseManager,
String workerIdentifier, String workerIdentifier,
long leaseDurationMillis, long leaseDurationMillis,
long epsilonMillis) { long epsilonMillis) {
@ -103,7 +103,7 @@ public class LeaseCoordinator<T extends Lease> {
* @param epsilonMillis Allow for some variance when calculating lease expirations * @param epsilonMillis Allow for some variance when calculating lease expirations
* @param metricsFactory Used to publish metrics about lease operations * @param metricsFactory Used to publish metrics about lease operations
*/ */
public LeaseCoordinator(ILeaseManager<T> leaseManager, public LeaseCoordinator(LeaseManager<T> leaseManager,
String workerIdentifier, String workerIdentifier,
long leaseDurationMillis, long leaseDurationMillis,
long epsilonMillis, long epsilonMillis,
@ -124,7 +124,7 @@ public class LeaseCoordinator<T extends Lease> {
* @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing) * @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing)
* @param metricsFactory Used to publish metrics about lease operations * @param metricsFactory Used to publish metrics about lease operations
*/ */
public LeaseCoordinator(ILeaseManager<T> leaseManager, public LeaseCoordinator(LeaseManager<T> leaseManager,
String workerIdentifier, String workerIdentifier,
long leaseDurationMillis, long leaseDurationMillis,
long epsilonMillis, long epsilonMillis,
@ -136,7 +136,7 @@ public class LeaseCoordinator<T extends Lease> {
this.leaseTaker = new DynamoDBLeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis) this.leaseTaker = new DynamoDBLeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis)
.withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
this.leaseRenewer = new LeaseRenewer<T>( this.leaseRenewer = new DynamoDBLeaseRenewer<T>(
leaseManager, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool); leaseManager, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool);
this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis; this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis;
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;

View file

@ -23,7 +23,7 @@ public interface LeaseManagementFactory {
ShardSyncTaskManager createShardSyncTaskManager(); ShardSyncTaskManager createShardSyncTaskManager();
LeaseManager<KinesisClientLease> createLeaseManager(); DynamoDBLeaseManager<KinesisClientLease> createLeaseManager();
KinesisClientLibLeaseCoordinator createKinesisClientLibLeaseCoordinator(); KinesisClientLibLeaseCoordinator createKinesisClientLibLeaseCoordinator();

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -14,580 +14,181 @@
*/ */
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.GetItemResult;
import com.amazonaws.services.dynamodbv2.model.LimitExceededException;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazonaws.services.dynamodbv2.model.TableStatus;
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.leases.Lease;
import lombok.extern.slf4j.Slf4j;
/** /**
* An implementation of ILeaseManager that uses DynamoDB. * Supports basic CRUD operations for Leases.
*
* @param <T> Lease subclass, possibly Lease itself.
*/ */
@Slf4j public interface LeaseManager<T extends Lease> {
public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
protected String table;
protected AmazonDynamoDB dynamoDBClient;
protected ILeaseSerializer<T> serializer;
protected boolean consistentReads;
/** /**
* Constructor. * Creates the table that will store leases. Succeeds if table already exists.
* *
* @param table leases table * @param readCapacity
* @param dynamoDBClient DynamoDB client to use * @param writeCapacity
* @param serializer LeaseSerializer to use to convert to/from DynamoDB objects.
*/
public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer<T> serializer) {
this(table, dynamoDBClient, serializer, false);
}
/**
* Constructor for test cases - allows control of consistent reads. Consistent reads should only be used for testing
* - our code is meant to be resilient to inconsistent reads. Using consistent reads during testing speeds up
* execution of simple tests (you don't have to wait out the consistency window). Test cases that want to experience
* eventual consistency should not set consistentReads=true.
* *
* @param table leases table * @return true if we created a new table (table didn't exist before)
* @param dynamoDBClient DynamoDB client to use *
* @param serializer lease serializer to use * @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity
* @param consistentReads true if we want consistent reads for testing purposes. * restrictions.
* @throws DependencyException if DynamoDB createTable fails in an unexpected way
*/ */
public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer<T> serializer, boolean consistentReads) {
verifyNotNull(table, "Table name cannot be null");
verifyNotNull(dynamoDBClient, "dynamoDBClient cannot be null");
verifyNotNull(serializer, "ILeaseSerializer cannot be null");
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.consistentReads = consistentReads;
this.serializer = serializer;
}
/**
* {@inheritDoc}
*/
@Override
public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
throws ProvisionedThroughputException, DependencyException { throws ProvisionedThroughputException, DependencyException;
verifyNotNull(readCapacity, "readCapacity cannot be null");
verifyNotNull(writeCapacity, "writeCapacity cannot be null");
try {
if (tableStatus() != null) {
return false;
}
} catch (DependencyException de) {
//
// Something went wrong with DynamoDB
//
log.error("Failed to get table status for {}", table, de);
}
CreateTableRequest request = new CreateTableRequest();
request.setTableName(table);
request.setKeySchema(serializer.getKeySchema());
request.setAttributeDefinitions(serializer.getAttributeDefinitions());
ProvisionedThroughput throughput = new ProvisionedThroughput();
throughput.setReadCapacityUnits(readCapacity);
throughput.setWriteCapacityUnits(writeCapacity);
request.setProvisionedThroughput(throughput);
try {
dynamoDBClient.createTable(request);
} catch (ResourceInUseException e) {
log.info("Table {} already exists.", table);
return false;
} catch (LimitExceededException e) {
throw new ProvisionedThroughputException("Capacity exceeded when creating table " + table, e);
} catch (AmazonClientException e) {
throw new DependencyException(e);
}
return true;
}
/** /**
* {@inheritDoc} * @return true if the lease table already exists.
*/
@Override
public boolean leaseTableExists() throws DependencyException {
return TableStatus.ACTIVE == tableStatus();
}
private TableStatus tableStatus() throws DependencyException {
DescribeTableRequest request = new DescribeTableRequest();
request.setTableName(table);
DescribeTableResult result;
try {
result = dynamoDBClient.describeTable(request);
} catch (ResourceNotFoundException e) {
if (log.isDebugEnabled()) {
log.debug("Got ResourceNotFoundException for table {} in leaseTableExists, returning false.", table);
}
return null;
} catch (AmazonClientException e) {
throw new DependencyException(e);
}
TableStatus tableStatus = TableStatus.fromValue(result.getTable().getTableStatus());
if (log.isDebugEnabled()) {
log.debug("Lease table exists and is in status {}", tableStatus);
}
return tableStatus;
}
@Override
public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException {
long sleepTimeRemaining = TimeUnit.SECONDS.toMillis(timeoutSeconds);
while (!leaseTableExists()) {
if (sleepTimeRemaining <= 0) {
return false;
}
long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining);
sleepTimeRemaining -= sleep(timeToSleepMillis);
}
return true;
}
/**
* Exposed for testing purposes.
* *
* @param timeToSleepMillis time to sleep in milliseconds * @throws DependencyException if DynamoDB describeTable fails in an unexpected way
*/
public boolean leaseTableExists() throws DependencyException;
/**
* Blocks until the lease table exists by polling leaseTableExists.
* *
* @return actual time slept in millis * @param secondsBetweenPolls time to wait between polls in seconds
*/ * @param timeoutSeconds total time to wait in seconds
long sleep(long timeToSleepMillis) { *
long startTime = System.currentTimeMillis(); * @return true if table exists, false if timeout was reached
*
try { * @throws DependencyException if DynamoDB describeTable fails in an unexpected way
Thread.sleep(timeToSleepMillis); */
} catch (InterruptedException e) { public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException;
log.debug("Interrupted while sleeping");
} /**
* List all objects in table synchronously.
return System.currentTimeMillis() - startTime; *
} * @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
/** * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
* {@inheritDoc}
*/
@Override
public List<T> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(null);
}
/**
* {@inheritDoc}
*/
@Override
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(1).isEmpty();
}
/**
* List with the given page size. Package access for integration testing.
* *
* @param limit number of items to consider at a time - used by integration tests to force paging.
* @return list of leases * @return list of leases
* @throws InvalidStateException if table does not exist
* @throws DependencyException if DynamoDB scan fail in an unexpected way
* @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
*/ */
List<T> list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException { public List<T> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
if (log.isDebugEnabled()) {
log.debug("Listing leases from table {}", table);
}
ScanRequest scanRequest = new ScanRequest();
scanRequest.setTableName(table);
if (limit != null) {
scanRequest.setLimit(limit);
}
try {
ScanResult scanResult = dynamoDBClient.scan(scanRequest);
List<T> result = new ArrayList<T>();
while (scanResult != null) {
for (Map<String, AttributeValue> item : scanResult.getItems()) {
if (log.isDebugEnabled()) {
log.debug("Got item {} from DynamoDB.", item.toString());
}
result.add(serializer.fromDynamoRecord(item));
}
Map<String, AttributeValue> lastEvaluatedKey = scanResult.getLastEvaluatedKey();
if (lastEvaluatedKey == null) {
// Signify that we're done.
scanResult = null;
if (log.isDebugEnabled()) {
log.debug("lastEvaluatedKey was null - scan finished.");
}
} else {
// Make another request, picking up where we left off.
scanRequest.setExclusiveStartKey(lastEvaluatedKey);
if (log.isDebugEnabled()) {
log.debug("lastEvaluatedKey was {}, continuing scan.", lastEvaluatedKey);
}
scanResult = dynamoDBClient.scan(scanRequest);
}
}
if (log.isDebugEnabled()) {
log.debug("Listed {} leases from table {}", result.size(), table);
}
return result;
} catch (ResourceNotFoundException e) {
throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e);
} catch (ProvisionedThroughputExceededException e) {
throw new ProvisionedThroughputException(e);
} catch (AmazonClientException e) {
throw new DependencyException(e);
}
}
/** /**
* {@inheritDoc} * Create a new lease. Conditional on a lease not already existing with this shardId.
*
* @param lease the lease to create
*
* @return true if lease was created, false if lease already exists
*
* @throws DependencyException if DynamoDB put fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB put fails due to lack of capacity
*/ */
@Override
public boolean createLeaseIfNotExists(T lease) public boolean createLeaseIfNotExists(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException;
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) {
log.debug("Creating lease {}", lease);
}
PutItemRequest request = new PutItemRequest();
request.setTableName(table);
request.setItem(serializer.toDynamoRecord(lease));
request.setExpected(serializer.getDynamoNonexistantExpectation());
try {
dynamoDBClient.putItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Did not create lease {} because it already existed", lease);
}
return false;
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("create", lease.getLeaseKey(), e);
}
return true;
}
/** /**
* {@inheritDoc} * @param shardId Get the lease for this shardId
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity
* @throws DependencyException if DynamoDB get fails in an unexpected way
*
* @return lease for the specified shardId, or null if one doesn't exist
*/ */
@Override public T getLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException;
public T getLease(String leaseKey)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(leaseKey, "leaseKey cannot be null");
if (log.isDebugEnabled()) {
log.debug("Getting lease with key {}", leaseKey);
}
GetItemRequest request = new GetItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(leaseKey));
request.setConsistentRead(consistentReads);
try {
GetItemResult result = dynamoDBClient.getItem(request);
Map<String, AttributeValue> dynamoRecord = result.getItem();
if (dynamoRecord == null) {
if (log.isDebugEnabled()) {
log.debug("No lease found with key {}, returning null.", leaseKey);
}
return null;
} else {
T lease = serializer.fromDynamoRecord(dynamoRecord);
if (log.isDebugEnabled()) {
log.debug("Got lease {}", lease);
}
return lease;
}
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("get", leaseKey, e);
}
}
/** /**
* {@inheritDoc} * Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter
* of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB.
*
* @param lease the lease to renew
*
* @return true if renewal succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/ */
@Override
public boolean renewLease(T lease) public boolean renewLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException;
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) {
log.debug("Renewing lease with key {}", lease.getLeaseKey());
}
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease));
request.setAttributeUpdates(serializer.getDynamoLeaseCounterUpdate(lease));
try {
dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}",
lease.getLeaseKey(), lease.getLeaseCounter());
}
// If we had a spurious retry during the Dynamo update, then this conditional PUT failure
// might be incorrect. So, we get the item straight away and check if the lease owner + lease counter
// are what we expected.
String expectedOwner = lease.getLeaseOwner();
Long expectedCounter = lease.getLeaseCounter() + 1;
T updatedLease = getLease(lease.getLeaseKey());
if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) ||
!expectedCounter.equals(updatedLease.getLeaseCounter())) {
return false;
}
log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.getLeaseKey());
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e);
}
lease.setLeaseCounter(lease.getLeaseCounter() + 1);
return true;
}
/** /**
* {@inheritDoc} * Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on
* the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the
* passed-in lease object after updating DynamoDB.
*
* @param lease the lease to take
* @param owner the new owner
*
* @return true if lease was successfully taken, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/ */
@Override
public boolean takeLease(T lease, String owner) public boolean takeLease(T lease, String owner)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException;
verifyNotNull(lease, "lease cannot be null");
verifyNotNull(owner, "owner cannot be null");
if (log.isDebugEnabled()) {
log.debug("Taking lease with leaseKey {} from {} to {}",
lease.getLeaseKey(),
lease.getLeaseOwner() == null ? "nobody" : lease.getLeaseOwner(),
owner);
}
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoLeaseCounterUpdate(lease);
updates.putAll(serializer.getDynamoTakeLeaseUpdate(lease, owner));
request.setAttributeUpdates(updates);
try {
dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}",
lease.getLeaseKey(), lease.getLeaseCounter());
}
return false;
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("take", lease.getLeaseKey(), e);
}
lease.setLeaseCounter(lease.getLeaseCounter() + 1);
lease.setLeaseOwner(owner);
return true;
}
/** /**
* {@inheritDoc} * Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of
* the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB.
*
* @param lease the lease to void
*
* @return true if eviction succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/ */
@Override
public boolean evictLease(T lease) public boolean evictLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException;
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) {
log.debug("Evicting lease with leaseKey {} owned by {}",
lease.getLeaseKey(),
lease.getLeaseOwner());
}
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseOwnerExpectation(lease));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoLeaseCounterUpdate(lease);
updates.putAll(serializer.getDynamoEvictLeaseUpdate(lease));
request.setAttributeUpdates(updates);
try {
dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}",
lease.getLeaseKey(), lease.getLeaseOwner());
}
return false;
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("evict", lease.getLeaseKey(), e);
}
lease.setLeaseOwner(null);
lease.setLeaseCounter(lease.getLeaseCounter() + 1);
return true;
}
/** /**
* {@inheritDoc} * Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB.
*
* @param lease the lease to delete
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB delete fails due to lack of capacity
* @throws DependencyException if DynamoDB delete fails in an unexpected way
*/ */
public void deleteAll() throws DependencyException, InvalidStateException, ProvisionedThroughputException { public void deleteLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException;
List<T> allLeases = listLeases();
log.warn("Deleting {} items from table {}", allLeases.size(), table);
for (T lease : allLeases) {
DeleteItemRequest deleteRequest = new DeleteItemRequest();
deleteRequest.setTableName(table);
deleteRequest.setKey(serializer.getDynamoHashKey(lease));
dynamoDBClient.deleteItem(deleteRequest);
}
}
/** /**
* {@inheritDoc} * Delete all leases from DynamoDB. Useful for tools/utils and testing.
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan or delete fail due to lack of capacity
* @throws DependencyException if DynamoDB scan or delete fail in an unexpected way
*/ */
@Override public void deleteAll() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
public void deleteLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) {
log.debug("Deleting lease with leaseKey {}", lease.getLeaseKey());
}
DeleteItemRequest deleteRequest = new DeleteItemRequest();
deleteRequest.setTableName(table);
deleteRequest.setKey(serializer.getDynamoHashKey(lease));
try {
dynamoDBClient.deleteItem(deleteRequest);
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("delete", lease.getLeaseKey(), e);
}
}
/** /**
* {@inheritDoc} * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing
* library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the
* leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other
* updates. Mutates the lease counter of the passed-in lease object.
*
* @return true if update succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/ */
@Override
public boolean updateLease(T lease) public boolean updateLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException;
verifyNotNull(lease, "lease cannot be null");
if (log.isDebugEnabled()) { /**
log.debug("Updating lease {}", lease); * Check (synchronously) if there are any leases in the lease table.
} *
* @return true if there are no leases in the lease table
UpdateItemRequest request = new UpdateItemRequest(); *
request.setTableName(table); * @throws DependencyException if DynamoDB scan fails in an unexpected way
request.setKey(serializer.getDynamoHashKey(lease)); * @throws InvalidStateException if lease table does not exist
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
Map<String, AttributeValueUpdate> updates = serializer.getDynamoLeaseCounterUpdate(lease);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));
request.setAttributeUpdates(updates);
try {
dynamoDBClient.updateItem(request);
} catch (ConditionalCheckFailedException e) {
if (log.isDebugEnabled()) {
log.debug("Lease update failed for lease with key {} because the lease counter was not {}",
lease.getLeaseKey(), lease.getLeaseCounter());
}
return false;
} catch (AmazonClientException e) {
throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e);
}
lease.setLeaseCounter(lease.getLeaseCounter() + 1);
return true;
}
/*
* This method contains boilerplate exception handling - it throws or returns something to be thrown. The
* inconsistency there exists to satisfy the compiler when this method is used at the end of non-void methods.
*/ */
protected DependencyException convertAndRethrowExceptions(String operation, String leaseKey, AmazonClientException e) public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
throws ProvisionedThroughputException, InvalidStateException {
if (e instanceof ProvisionedThroughputExceededException) {
log.warn("Provisioned Throughput on the lease table has been exceeded. It's recommended that you increase the IOPs on the table. Failure to increase the IOPs may cause the application to not make progress.");
throw new ProvisionedThroughputException(e);
} else if (e instanceof ResourceNotFoundException) {
// @formatter:on
throw new InvalidStateException(String.format("Cannot %s lease with key %s because table %s does not exist.",
operation,
leaseKey,
table),
e);
//@formatter:off
} else {
return new DependencyException(e);
}
}
private void verifyNotNull(Object object, String message) {
if (object == null) {
throw new IllegalArgumentException(message);
}
}
} }

View file

@ -28,7 +28,7 @@ import software.amazon.kinesis.leases.Lease;
* LeaseCoordinator instance corresponds to one worker, and uses exactly one ILeaseRenewer to manage lease renewal for * LeaseCoordinator instance corresponds to one worker, and uses exactly one ILeaseRenewer to manage lease renewal for
* that worker. * that worker.
*/ */
public interface ILeaseRenewer<T extends Lease> { public interface LeaseRenewer<T extends Lease> {
/** /**
* Bootstrap initial set of leases from the LeaseManager (e.g. upon process restart, pick up leases we own) * Bootstrap initial set of leases from the LeaseManager (e.g. upon process restart, pick up leases we own)

View file

@ -14,181 +14,103 @@
*/ */
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue; import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType; import software.amazon.kinesis.leases.Lease;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
/** /**
* An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that * Utility class that manages the mapping of Lease objects/operations to records in DynamoDB.
* LeaseSerializer can be decorated by other classes if you need to add fields to leases. *
* @param <T> Lease subclass, possibly Lease itself
*/ */
public class LeaseSerializer implements ILeaseSerializer<Lease> { public interface LeaseSerializer<T extends Lease> {
public final String LEASE_KEY_KEY = "leaseKey"; /**
public final String LEASE_OWNER_KEY = "leaseOwner"; * Construct a DynamoDB record out of a Lease object
public final String LEASE_COUNTER_KEY = "leaseCounter"; *
public final Class<? extends Lease> clazz; * @param lease lease object to serialize
* @return an attribute value map representing the lease object
*/
public Map<String, AttributeValue> toDynamoRecord(T lease);
public LeaseSerializer() { /**
this.clazz = Lease.class; * Construct a Lease object out of a DynamoDB record.
} *
* @param dynamoRecord attribute value map from DynamoDB
* @return a deserialized lease object representing the attribute value map
*/
public T fromDynamoRecord(Map<String, AttributeValue> dynamoRecord);
public LeaseSerializer(Class<? extends Lease> clazz) { /**
this.clazz = clazz; * @param lease
} * @return the attribute value map representing a Lease's hash key given a Lease object.
*/
public Map<String, AttributeValue> getDynamoHashKey(T lease);
@Override /**
public Map<String, AttributeValue> toDynamoRecord(Lease lease) { * Special getDynamoHashKey implementation used by ILeaseManager.getLease().
Map<String, AttributeValue> result = new HashMap<String, AttributeValue>(); *
* @param leaseKey
* @return the attribute value map representing a Lease's hash key given a string.
*/
public Map<String, AttributeValue> getDynamoHashKey(String leaseKey);
result.put(LEASE_KEY_KEY, DynamoUtils.createAttributeValue(lease.getLeaseKey())); /**
result.put(LEASE_COUNTER_KEY, DynamoUtils.createAttributeValue(lease.getLeaseCounter())); * @param lease
* @return the attribute value map asserting that a lease counter is what we expect.
*/
public Map<String, ExpectedAttributeValue> getDynamoLeaseCounterExpectation(T lease);
if (lease.getLeaseOwner() != null) { /**
result.put(LEASE_OWNER_KEY, DynamoUtils.createAttributeValue(lease.getLeaseOwner())); * @param lease
} * @return the attribute value map asserting that the lease owner is what we expect.
*/
public Map<String, ExpectedAttributeValue> getDynamoLeaseOwnerExpectation(T lease);
return result; /**
} * @return the attribute value map asserting that a lease does not exist.
*/
public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation();
@Override /**
public Lease fromDynamoRecord(Map<String, AttributeValue> dynamoRecord) { * @param lease
Lease result; * @return the attribute value map that increments a lease counter
try { */
result = clazz.newInstance(); public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(T lease);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
result.setLeaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY)); /**
result.setLeaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY)); * @param lease
result.setLeaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY)); * @param newOwner
* @return the attribute value map that takes a lease for a new owner
*/
public Map<String, AttributeValueUpdate> getDynamoTakeLeaseUpdate(T lease, String newOwner);
return result; /**
} * @param lease
* @return the attribute value map that voids a lease
*/
public Map<String, AttributeValueUpdate> getDynamoEvictLeaseUpdate(T lease);
@Override /**
public Map<String, AttributeValue> getDynamoHashKey(String leaseKey) { * @param lease
Map<String, AttributeValue> result = new HashMap<String, AttributeValue>(); * @return the attribute value map that updates application-specific data for a lease and increments the lease
* counter
*/
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(T lease);
result.put(LEASE_KEY_KEY, DynamoUtils.createAttributeValue(leaseKey)); /**
* @return the key schema for creating a DynamoDB table to store leases
*/
public Collection<KeySchemaElement> getKeySchema();
return result; /**
} * @return attribute definitions for creating a DynamoDB table to store leases
*/
@Override public Collection<AttributeDefinition> getAttributeDefinitions();
public Map<String, AttributeValue> getDynamoHashKey(Lease lease) {
return getDynamoHashKey(lease.getLeaseKey());
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoLeaseCounterExpectation(Lease lease) {
return getDynamoLeaseCounterExpectation(lease.getLeaseCounter());
}
public Map<String, ExpectedAttributeValue> getDynamoLeaseCounterExpectation(Long leaseCounter) {
Map<String, ExpectedAttributeValue> result = new HashMap<String, ExpectedAttributeValue>();
ExpectedAttributeValue eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(leaseCounter));
result.put(LEASE_COUNTER_KEY, eav);
return result;
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoLeaseOwnerExpectation(Lease lease) {
Map<String, ExpectedAttributeValue> result = new HashMap<String, ExpectedAttributeValue>();
ExpectedAttributeValue eav = null;
if (lease.getLeaseOwner() == null) {
eav = new ExpectedAttributeValue(false);
} else {
eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner()));
}
result.put(LEASE_OWNER_KEY, eav);
return result;
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation() {
Map<String, ExpectedAttributeValue> result = new HashMap<String, ExpectedAttributeValue>();
ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(false);
result.put(LEASE_KEY_KEY, expectedAV);
return result;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(Lease lease) {
return getDynamoLeaseCounterUpdate(lease.getLeaseCounter());
}
public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(Long leaseCounter) {
Map<String, AttributeValueUpdate> result = new HashMap<String, AttributeValueUpdate>();
AttributeValueUpdate avu =
new AttributeValueUpdate(DynamoUtils.createAttributeValue(leaseCounter + 1), AttributeAction.PUT);
result.put(LEASE_COUNTER_KEY, avu);
return result;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoTakeLeaseUpdate(Lease lease, String owner) {
Map<String, AttributeValueUpdate> result = new HashMap<String, AttributeValueUpdate>();
result.put(LEASE_OWNER_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(owner),
AttributeAction.PUT));
return result;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoEvictLeaseUpdate(Lease lease) {
Map<String, AttributeValueUpdate> result = new HashMap<String, AttributeValueUpdate>();
result.put(LEASE_OWNER_KEY, new AttributeValueUpdate(null, AttributeAction.DELETE));
return result;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease) {
// There is no application-specific data in Lease - just return a map that increments the counter.
return new HashMap<String, AttributeValueUpdate>();
}
@Override
public Collection<KeySchemaElement> getKeySchema() {
List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName(LEASE_KEY_KEY).withKeyType(KeyType.HASH));
return keySchema;
}
@Override
public Collection<AttributeDefinition> getAttributeDefinitions() {
List<AttributeDefinition> definitions = new ArrayList<AttributeDefinition>();
definitions.add(new AttributeDefinition().withAttributeName(LEASE_KEY_KEY)
.withAttributeType(ScalarAttributeType.S));
return definitions;
}
} }

View file

@ -36,7 +36,7 @@ public class ShardSyncTask implements ITask {
@NonNull @NonNull
private final LeaseManagerProxy leaseManagerProxy; private final LeaseManagerProxy leaseManagerProxy;
@NonNull @NonNull
private final ILeaseManager<KinesisClientLease> leaseManager; private final LeaseManager<KinesisClientLease> leaseManager;
@NonNull @NonNull
private final InitialPositionInStreamExtended initialPosition; private final InitialPositionInStreamExtended initialPosition;
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;

View file

@ -41,7 +41,7 @@ public class ShardSyncTaskManager {
@NonNull @NonNull
private final LeaseManagerProxy leaseManagerProxy; private final LeaseManagerProxy leaseManagerProxy;
@NonNull @NonNull
private final ILeaseManager<KinesisClientLease> leaseManager; private final LeaseManager<KinesisClientLease> leaseManager;
@NonNull @NonNull
private final InitialPositionInStreamExtended initialPositionInStream; private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;

View file

@ -58,7 +58,7 @@ public class ShardSyncer {
} }
static synchronized void bootstrapShardLeases(@NonNull final LeaseManagerProxy leaseManagerProxy, static synchronized void bootstrapShardLeases(@NonNull final LeaseManagerProxy leaseManagerProxy,
@NonNull final ILeaseManager<KinesisClientLease> leaseManager, @NonNull final LeaseManager<KinesisClientLease> leaseManager,
@NonNull final InitialPositionInStreamExtended initialPositionInStream, @NonNull final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesOfCompletedShards, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards) final boolean ignoreUnexpectedChildShards)
@ -80,7 +80,7 @@ public class ShardSyncer {
* @throws KinesisClientLibIOException * @throws KinesisClientLibIOException
*/ */
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final LeaseManagerProxy leaseManagerProxy, public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final LeaseManagerProxy leaseManagerProxy,
@NonNull final ILeaseManager<KinesisClientLease> leaseManager, @NonNull final LeaseManager<KinesisClientLease> leaseManager,
@NonNull final InitialPositionInStreamExtended initialPositionInStream, @NonNull final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesOfCompletedShards, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards) final boolean ignoreUnexpectedChildShards)
@ -90,7 +90,7 @@ public class ShardSyncer {
} }
static synchronized void checkAndCreateLeasesForNewShards(@NonNull final LeaseManagerProxy leaseManagerProxy, static synchronized void checkAndCreateLeasesForNewShards(@NonNull final LeaseManagerProxy leaseManagerProxy,
@NonNull final ILeaseManager<KinesisClientLease> leaseManager, @NonNull final LeaseManager<KinesisClientLease> leaseManager,
@NonNull final InitialPositionInStreamExtended initialPositionInStream, @NonNull final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesOfCompletedShards) final boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
@ -112,7 +112,7 @@ public class ShardSyncer {
*/ */
// CHECKSTYLE:OFF CyclomaticComplexity // CHECKSTYLE:OFF CyclomaticComplexity
private static synchronized void syncShardLeases(@NonNull final LeaseManagerProxy leaseManagerProxy, private static synchronized void syncShardLeases(@NonNull final LeaseManagerProxy leaseManagerProxy,
final ILeaseManager<KinesisClientLease> leaseManager, final LeaseManager<KinesisClientLease> leaseManager,
final InitialPositionInStreamExtended initialPosition, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards) final boolean ignoreUnexpectedChildShards)
@ -593,7 +593,7 @@ public class ShardSyncer {
private static void cleanupGarbageLeases(@NonNull final LeaseManagerProxy leaseManagerProxy, private static void cleanupGarbageLeases(@NonNull final LeaseManagerProxy leaseManagerProxy,
final List<Shard> shards, final List<Shard> shards,
final List<KinesisClientLease> trackedLeases, final List<KinesisClientLease> trackedLeases,
final ILeaseManager<KinesisClientLease> leaseManager) final LeaseManager<KinesisClientLease> leaseManager)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
Set<String> kinesisShards = new HashSet<>(); Set<String> kinesisShards = new HashSet<>();
for (Shard shard : shards) { for (Shard shard : shards) {
@ -684,7 +684,7 @@ public class ShardSyncer {
Map<String, Shard> shardIdToShardMap, Map<String, Shard> shardIdToShardMap,
Map<String, Set<String>> shardIdToChildShardIdsMap, Map<String, Set<String>> shardIdToChildShardIdsMap,
List<KinesisClientLease> trackedLeases, List<KinesisClientLease> trackedLeases,
ILeaseManager<KinesisClientLease> leaseManager) LeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
Set<String> shardIdsOfClosedShards = new HashSet<>(); Set<String> shardIdsOfClosedShards = new HashSet<>();
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>(); List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
@ -731,7 +731,7 @@ public class ShardSyncer {
static synchronized void cleanupLeaseForClosedShard(String closedShardId, static synchronized void cleanupLeaseForClosedShard(String closedShardId,
Set<String> childShardIds, Set<String> childShardIds,
Map<String, KinesisClientLease> trackedLeases, Map<String, KinesisClientLease> trackedLeases,
ILeaseManager<KinesisClientLease> leaseManager) LeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId); KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
List<KinesisClientLease> childShardLeases = new ArrayList<>(); List<KinesisClientLease> childShardLeases = new ArrayList<>();

View file

@ -20,7 +20,7 @@ import lombok.AccessLevel;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -39,7 +39,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public class BlockOnParentShardTask implements ITask { public class BlockOnParentShardTask implements ITask {
@NonNull @NonNull
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final ILeaseManager<KinesisClientLease> leaseManager; private final LeaseManager<KinesisClientLease> leaseManager;
// Sleep for this duration if the parent shards have not completed processing, or we encounter an exception. // Sleep for this duration if the parent shards have not completed processing, or we encounter an exception.
private final long parentShardPollIntervalMillis; private final long parentShardPollIntervalMillis;

View file

@ -35,7 +35,7 @@ import lombok.Synchronized;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.LeaseManagerProxy;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -60,7 +60,7 @@ public class ShardConsumer {
@NonNull @NonNull
private final String streamName; private final String streamName;
@NonNull @NonNull
private final ILeaseManager<KinesisClientLease> leaseManager; private final LeaseManager<KinesisClientLease> leaseManager;
@NonNull @NonNull
private final ExecutorService executorService; private final ExecutorService executorService;
@NonNull @NonNull

View file

@ -21,7 +21,7 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.LeaseManagerProxy;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -55,7 +55,7 @@ public class ShutdownTask implements ITask {
private final boolean cleanupLeasesOfCompletedShards; private final boolean cleanupLeasesOfCompletedShards;
private final boolean ignoreUnexpectedChildShards; private final boolean ignoreUnexpectedChildShards;
@NonNull @NonNull
private final ILeaseManager<KinesisClientLease> leaseManager; private final LeaseManager<KinesisClientLease> leaseManager;
private final long backoffTimeMillis; private final long backoffTimeMillis;
@NonNull @NonNull
private final GetRecordsCache getRecordsCache; private final GetRecordsCache getRecordsCache;

View file

@ -52,7 +52,7 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
} }
@Override @Override
public GetRecordsCache createGetRecordsCache(final ShardInfo shardInfo) { public GetRecordsCache createGetRecordsCache(final ShardInfo shardInfo, final IMetricsFactory metricsFactory) {
return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount,
maxRecordsPerCall, createGetRecordsRetrievalStrategy(shardInfo), executorService, maxRecordsPerCall, createGetRecordsRetrievalStrategy(shardInfo), executorService,
idleMillisBetweenCalls, metricsFactory, "Prefetching", shardInfo.shardId()); idleMillisBetweenCalls, metricsFactory, "Prefetching", shardInfo.shardId());

View file

@ -50,13 +50,13 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibN
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory; import software.amazon.kinesis.checkpoint.CheckpointFactory;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseManagementFactory; import software.amazon.kinesis.leases.LeaseManagementFactory;
import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.DynamoDBLeaseManager;
import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.LeaseManagerProxy;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.ShardSyncTaskManager;
@ -111,7 +111,7 @@ public class SchedulerTest {
@Mock @Mock
private ShardSyncTaskManager shardSyncTaskManager; private ShardSyncTaskManager shardSyncTaskManager;
@Mock @Mock
private LeaseManager<KinesisClientLease> leaseManager; private DynamoDBLeaseManager<KinesisClientLease> dynamoDBLeaseManager;
@Mock @Mock
private LeaseManagerProxy leaseManagerProxy; private LeaseManagerProxy leaseManagerProxy;
@Mock @Mock
@ -131,7 +131,7 @@ public class SchedulerTest {
processorConfig = new ProcessorConfig(processorFactory); processorConfig = new ProcessorConfig(processorFactory);
retrievalConfig = new RetrievalConfig(streamName, amazonKinesis).retrievalFactory(retrievalFactory); retrievalConfig = new RetrievalConfig(streamName, amazonKinesis).retrievalFactory(retrievalFactory);
when(leaseCoordinator.leaseManager()).thenReturn(leaseManager); when(leaseCoordinator.leaseManager()).thenReturn(dynamoDBLeaseManager);
when(shardSyncTaskManager.leaseManagerProxy()).thenReturn(leaseManagerProxy); when(shardSyncTaskManager.leaseManagerProxy()).thenReturn(leaseManagerProxy);
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(IMetricsFactory.class))).thenReturn(getRecordsCache); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(IMetricsFactory.class))).thenReturn(getRecordsCache);
@ -442,8 +442,8 @@ public class SchedulerTest {
} }
@Override @Override
public LeaseManager<KinesisClientLease> createLeaseManager() { public DynamoDBLeaseManager<KinesisClientLease> createLeaseManager() {
return leaseManager; return dynamoDBLeaseManager;
} }
@Override @Override
@ -460,7 +460,7 @@ public class SchedulerTest {
private class TestKinesisCheckpointFactory implements CheckpointFactory { private class TestKinesisCheckpointFactory implements CheckpointFactory {
@Override @Override
public Checkpointer createCheckpointer(final LeaseCoordinator<KinesisClientLease> leaseCoordinator, public Checkpointer createCheckpointer(final LeaseCoordinator<KinesisClientLease> leaseCoordinator,
final ILeaseManager<KinesisClientLease> leaseManager) { final LeaseManager<KinesisClientLease> leaseManager) {
return checkpoint; return checkpoint;
} }
} }

View file

@ -24,7 +24,7 @@ import org.junit.Test;
import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.leases.exceptions.LeasingException;
public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { public class DynamoDBLeaseManagerIntegrationTest extends LeaseIntegrationTest {
/** /**
* Test listLeases when no records are present. * Test listLeases when no records are present.
@ -233,7 +233,7 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest {
@Test @Test
public void testWaitUntilLeaseTableExists() throws LeasingException { public void testWaitUntilLeaseTableExists() throws LeasingException {
KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true) { KinesisClientDynamoDBLeaseManager manager = new KinesisClientDynamoDBLeaseManager("nagl_ShardProgress", ddbClient, true) {
@Override @Override
long sleep(long timeToSleepMillis) { long sleep(long timeToSleepMillis) {
@ -252,7 +252,7 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest {
* Just using AtomicInteger for the indirection it provides. * Just using AtomicInteger for the indirection it provides.
*/ */
final AtomicInteger sleepCounter = new AtomicInteger(0); final AtomicInteger sleepCounter = new AtomicInteger(0);
KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nonexistentTable", ddbClient, true) { KinesisClientDynamoDBLeaseManager manager = new KinesisClientDynamoDBLeaseManager("nonexistentTable", ddbClient, true) {
@Override @Override
long sleep(long timeToSleepMillis) { long sleep(long timeToSleepMillis) {

View file

@ -24,16 +24,16 @@ import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest {
// This test case's leases last 2 seconds // This test case's leases last 2 seconds
private static final long LEASE_DURATION_MILLIS = 2000L; private static final long LEASE_DURATION_MILLIS = 2000L;
private ILeaseRenewer<KinesisClientLease> renewer; private LeaseRenewer<KinesisClientLease> renewer;
@Before @Before
public void setUp() { public void setUp() {
renewer = new LeaseRenewer<KinesisClientLease>( renewer = new DynamoDBLeaseRenewer<KinesisClientLease>(
leaseManager, "foo", LEASE_DURATION_MILLIS, Executors.newCachedThreadPool()); leaseManager, "foo", LEASE_DURATION_MILLIS, Executors.newCachedThreadPool());
} }
@ -242,7 +242,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager); TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);
builder.withLease(shardId, owner); builder.withLease(shardId, owner);
Map<String, KinesisClientLease> leases = builder.build(); Map<String, KinesisClientLease> leases = builder.build();
LeaseRenewer<KinesisClientLease> renewer =new LeaseRenewer<KinesisClientLease>( DynamoDBLeaseRenewer<KinesisClientLease> renewer =new DynamoDBLeaseRenewer<KinesisClientLease>(
leaseManager, owner, 30000L, Executors.newCachedThreadPool()); leaseManager, owner, 30000L, Executors.newCachedThreadPool());
renewer.initialize(); renewer.initialize();
Map<String, KinesisClientLease> heldLeases = renewer.getCurrentlyHeldLeases(); Map<String, KinesisClientLease> heldLeases = renewer.getCurrentlyHeldLeases();

View file

@ -30,17 +30,14 @@ import org.mockito.Mockito;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRenewer;
public class LeaseRenewerTest { public class DynamoDBLeaseRenewerTest {
ILeaseManager<Lease> leaseManager; LeaseManager<Lease> leaseManager;
String workerIdentifier; String workerIdentifier;
long leaseDurationMillis; long leaseDurationMillis;
ExecutorService leaseRenewalExecService; ExecutorService leaseRenewalExecService;
LeaseRenewer<Lease> renewer; DynamoDBLeaseRenewer<Lease> renewer;
List<Lease> leasesToRenew; List<Lease> leasesToRenew;
private static Lease newLease(String leaseKey, private static Lease newLease(String leaseKey,
@ -64,12 +61,12 @@ public class LeaseRenewerTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Before @Before
public void before() { public void before() {
leaseManager = Mockito.mock(ILeaseManager.class); leaseManager = Mockito.mock(LeaseManager.class);
workerIdentifier = "workerId"; workerIdentifier = "workerId";
leaseDurationMillis = 10000; leaseDurationMillis = 10000;
leaseRenewalExecService = Executors.newSingleThreadExecutor(); leaseRenewalExecService = Executors.newSingleThreadExecutor();
leasesToRenew = null; leasesToRenew = null;
renewer = new LeaseRenewer<>(leaseManager, renewer = new DynamoDBLeaseRenewer<>(leaseManager,
workerIdentifier, workerIdentifier,
leaseDurationMillis, leaseDurationMillis,
Executors.newCachedThreadPool()); Executors.newCachedThreadPool());

View file

@ -20,8 +20,6 @@ import java.util.List;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -30,7 +28,7 @@ import lombok.extern.slf4j.Slf4j;
* *
*/ */
@Slf4j @Slf4j
public class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease> { public class ExceptionThrowingLeaseManager implements LeaseManager<KinesisClientLease> {
private static final Throwable EXCEPTION_MSG = new Throwable("Test Exception"); private static final Throwable EXCEPTION_MSG = new Throwable("Test Exception");
// Use array below to control in what situations we want to throw exceptions. // Use array below to control in what situations we want to throw exceptions.
@ -70,14 +68,14 @@ public class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClien
private int timeThrowingException = Integer.MAX_VALUE; private int timeThrowingException = Integer.MAX_VALUE;
// The real local lease manager which would do the real implementations. // The real local lease manager which would do the real implementations.
private final ILeaseManager<KinesisClientLease> leaseManager; private final LeaseManager<KinesisClientLease> leaseManager;
/** /**
* Constructor accepts lease manager as only argument. * Constructor accepts lease manager as only argument.
* *
* @param leaseManager which will do the real implementations * @param leaseManager which will do the real implementations
*/ */
ExceptionThrowingLeaseManager(ILeaseManager<KinesisClientLease> leaseManager) { ExceptionThrowingLeaseManager(LeaseManager<KinesisClientLease> leaseManager) {
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.leaseManagerMethodCallingCount = new int[ExceptionThrowingLeaseManagerMethods.values().length]; this.leaseManagerMethodCallingCount = new int[ExceptionThrowingLeaseManagerMethods.values().length];
} }

View file

@ -53,7 +53,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
private static KinesisClientLeaseManager leaseManager; private static KinesisClientDynamoDBLeaseManager leaseManager;
private static DynamoDBCheckpointer dynamoDBCheckpointer; private static DynamoDBCheckpointer dynamoDBCheckpointer;
private KinesisClientLibLeaseCoordinator coordinator; private KinesisClientLibLeaseCoordinator coordinator;
@ -66,7 +66,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
if (leaseManager == null) { if (leaseManager == null) {
AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain());
leaseManager = leaseManager =
new KinesisClientLeaseManager(TABLE_NAME, ddb, useConsistentReads); new KinesisClientDynamoDBLeaseManager(TABLE_NAME, ddb, useConsistentReads);
} }
leaseManager.createLeaseTableIfNotExists(10L, 10L); leaseManager.createLeaseTableIfNotExists(10L, 10L);
leaseManager.deleteAll(); leaseManager.deleteAll();
@ -220,7 +220,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
assertEquals(original, actual); // Assert the contents of the lease assertEquals(original, actual); // Assert the contents of the lease
} }
public void addLeasesToRenew(ILeaseRenewer<KinesisClientLease> renewer, String... shardIds) public void addLeasesToRenew(LeaseRenewer<KinesisClientLease> renewer, String... shardIds)
throws DependencyException, InvalidStateException { throws DependencyException, InvalidStateException {
List<KinesisClientLease> leasesToRenew = new ArrayList<KinesisClientLease>(); List<KinesisClientLease> leasesToRenew = new ArrayList<KinesisClientLease>();
@ -233,7 +233,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
renewer.addLeasesToRenew(leasesToRenew); renewer.addLeasesToRenew(leasesToRenew);
} }
public Map<String, KinesisClientLease> renewMutateAssert(ILeaseRenewer<KinesisClientLease> renewer, public Map<String, KinesisClientLease> renewMutateAssert(LeaseRenewer<KinesisClientLease> renewer,
String... renewedShardIds) throws DependencyException, InvalidStateException { String... renewedShardIds) throws DependencyException, InvalidStateException {
renewer.renewLeases(); renewer.renewLeases();

View file

@ -44,7 +44,7 @@ public class KinesisClientLibLeaseCoordinatorTest {
private static final UUID TEST_UUID = UUID.randomUUID(); private static final UUID TEST_UUID = UUID.randomUUID();
@Mock @Mock
private ILeaseManager<KinesisClientLease> leaseManager; private LeaseManager<KinesisClientLease> leaseManager;
@Mock @Mock
private LeaseCoordinator<KinesisClientLease> leaseCoordinator; private LeaseCoordinator<KinesisClientLease> leaseCoordinator;
@Mock @Mock

View file

@ -55,7 +55,7 @@ public class LeaseCoordinatorExerciser {
new DefaultAWSCredentialsProviderChain(); new DefaultAWSCredentialsProviderChain();
AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(creds); AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(creds);
ILeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddb); LeaseManager<KinesisClientLease> leaseManager = new KinesisClientDynamoDBLeaseManager("nagl_ShardProgress", ddb);
if (leaseManager.createLeaseTableIfNotExists(10L, 50L)) { if (leaseManager.createLeaseTableIfNotExists(10L, 50L)) {
log.info("Waiting for newly created lease table"); log.info("Waiting for newly created lease table");

View file

@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class LeaseIntegrationTest { public class LeaseIntegrationTest {
protected static KinesisClientLeaseManager leaseManager; protected static KinesisClientDynamoDBLeaseManager leaseManager;
protected static AmazonDynamoDBClient ddbClient = protected static AmazonDynamoDBClient ddbClient =
new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain());
@ -42,7 +42,7 @@ public class LeaseIntegrationTest {
if (leaseManager == null) { if (leaseManager == null) {
// Do some static setup once per class. // Do some static setup once per class.
leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true); leaseManager = new KinesisClientDynamoDBLeaseManager("nagl_ShardProgress", ddbClient, true);
MetricsHelper.startScope(new NullMetricsFactory()); MetricsHelper.startScope(new NullMetricsFactory());
} }

View file

@ -48,7 +48,7 @@ public class ShardSyncTaskIntegrationTest {
private static final String STREAM_NAME = "IntegrationTestStream02"; private static final String STREAM_NAME = "IntegrationTestStream02";
private static AmazonKinesis amazonKinesis; private static AmazonKinesis amazonKinesis;
private IKinesisClientLeaseManager leaseManager; private KinesisClientLeaseManager leaseManager;
private LeaseManagerProxy leaseManagerProxy; private LeaseManagerProxy leaseManagerProxy;
/** /**
@ -86,7 +86,7 @@ public class ShardSyncTaskIntegrationTest {
public void setUp() throws Exception { public void setUp() throws Exception {
boolean useConsistentReads = true; boolean useConsistentReads = true;
leaseManager = leaseManager =
new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest", new KinesisClientDynamoDBLeaseManager("ShardSyncTaskIntegrationTest",
AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_EAST_1).build(), AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_EAST_1).build(),
useConsistentReads); useConsistentReads);

View file

@ -72,7 +72,7 @@ public class ShardSyncerTest {
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L)); InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L));
private final boolean cleanupLeasesOfCompletedShards = true; private final boolean cleanupLeasesOfCompletedShards = true;
private AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); private AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
private LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient); private DynamoDBLeaseManager<KinesisClientLease> dynamoDBLeaseManager = new KinesisClientDynamoDBLeaseManager("tempTestTable", ddbClient);
private static final int EXPONENT = 128; private static final int EXPONENT = 128;
/** /**
* Old/Obsolete max value of a sequence number (2^128 -1). * Old/Obsolete max value of a sequence number (2^128 -1).
@ -84,16 +84,16 @@ public class ShardSyncerTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
boolean created = leaseManager.createLeaseTableIfNotExists(1L, 1L); boolean created = dynamoDBLeaseManager.createLeaseTableIfNotExists(1L, 1L);
if (created) { if (created) {
log.info("New table created."); log.info("New table created.");
} }
leaseManager.deleteAll(); dynamoDBLeaseManager.deleteAll();
} }
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
leaseManager.deleteAll(); dynamoDBLeaseManager.deleteAll();
} }
/** /**
@ -213,9 +213,9 @@ public class ShardSyncerTest {
when(leaseManagerProxy.listShards()).thenReturn(shards); when(leaseManagerProxy.listShards()).thenReturn(shards);
ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_LATEST, ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards); cleanupLeasesOfCompletedShards);
List<KinesisClientLease> newLeases = leaseManager.listLeases(); List<KinesisClientLease> newLeases = dynamoDBLeaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<>(); Set<String> expectedLeaseShardIds = new HashSet<>();
expectedLeaseShardIds.add("shardId-4"); expectedLeaseShardIds.add("shardId-4");
expectedLeaseShardIds.add("shardId-8"); expectedLeaseShardIds.add("shardId-8");
@ -245,9 +245,9 @@ public class ShardSyncerTest {
when(leaseManagerProxy.listShards()).thenReturn(shards); when(leaseManagerProxy.listShards()).thenReturn(shards);
ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON, ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards); cleanupLeasesOfCompletedShards);
List<KinesisClientLease> newLeases = leaseManager.listLeases(); List<KinesisClientLease> newLeases = dynamoDBLeaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<>(); Set<String> expectedLeaseShardIds = new HashSet<>();
for (int i = 0; i < 11; i++) { for (int i = 0; i < 11; i++) {
expectedLeaseShardIds.add("shardId-" + i); expectedLeaseShardIds.add("shardId-" + i);
@ -274,9 +274,9 @@ public class ShardSyncerTest {
when(leaseManagerProxy.listShards()).thenReturn(shards); when(leaseManagerProxy.listShards()).thenReturn(shards);
ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP, ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_AT_TIMESTAMP,
cleanupLeasesOfCompletedShards); cleanupLeasesOfCompletedShards);
List<KinesisClientLease> newLeases = leaseManager.listLeases(); List<KinesisClientLease> newLeases = dynamoDBLeaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<>(); Set<String> expectedLeaseShardIds = new HashSet<>();
for (int i = 0; i < 11; i++) { for (int i = 0; i < 11; i++) {
expectedLeaseShardIds.add("shardId-" + i); expectedLeaseShardIds.add("shardId-" + i);
@ -306,7 +306,7 @@ public class ShardSyncerTest {
when(leaseManagerProxy.listShards()).thenReturn(shards); when(leaseManagerProxy.listShards()).thenReturn(shards);
ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON, ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards); cleanupLeasesOfCompletedShards);
} }
@ -330,9 +330,9 @@ public class ShardSyncerTest {
when(leaseManagerProxy.listShards()).thenReturn(shards); when(leaseManagerProxy.listShards()).thenReturn(shards);
ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_LATEST, ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true); cleanupLeasesOfCompletedShards, true);
List<KinesisClientLease> newLeases = leaseManager.listLeases(); List<KinesisClientLease> newLeases = dynamoDBLeaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<>(); Set<String> expectedLeaseShardIds = new HashSet<>();
expectedLeaseShardIds.add("shardId-4"); expectedLeaseShardIds.add("shardId-4");
expectedLeaseShardIds.add("shardId-5"); expectedLeaseShardIds.add("shardId-5");
@ -379,7 +379,7 @@ public class ShardSyncerTest {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON); ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer // Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll(); dynamoDBLeaseManager.deleteAll();
} }
} }
@ -403,7 +403,7 @@ public class ShardSyncerTest {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON); ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer // Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll(); dynamoDBLeaseManager.deleteAll();
} }
} }
@ -427,7 +427,7 @@ public class ShardSyncerTest {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c,INITIAL_POSITION_TRIM_HORIZON); ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c,INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer // Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll(); dynamoDBLeaseManager.deleteAll();
} }
} }
@ -440,7 +440,7 @@ public class ShardSyncerTest {
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
if (exceptionMethod != null) { if (exceptionMethod != null) {
ExceptionThrowingLeaseManager exceptionThrowingLeaseManager = ExceptionThrowingLeaseManager exceptionThrowingLeaseManager =
new ExceptionThrowingLeaseManager(leaseManager); new ExceptionThrowingLeaseManager(dynamoDBLeaseManager);
// Set exception and throwing time for exceptionThrowingManager. // Set exception and throwing time for exceptionThrowingManager.
exceptionThrowingLeaseManager.setLeaseLeaseManagerThrowingExceptionScenario(exceptionMethod, exceptionTime); exceptionThrowingLeaseManager.setLeaseLeaseManagerThrowingExceptionScenario(exceptionMethod, exceptionTime);
// Only need to try two times. // Only need to try two times.
@ -459,7 +459,7 @@ public class ShardSyncerTest {
} }
} else { } else {
ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy,
leaseManager, dynamoDBLeaseManager,
position, position,
cleanupLeasesOfCompletedShards); cleanupLeasesOfCompletedShards);
} }
@ -501,7 +501,7 @@ public class ShardSyncerTest {
ExceptionThrowingLeaseManagerMethods.DELETELEASE, ExceptionThrowingLeaseManagerMethods.DELETELEASE,
c, INITIAL_POSITION_AT_TIMESTAMP); c, INITIAL_POSITION_AT_TIMESTAMP);
// Need to clean up lease manager every time after calling ShardSyncer // Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll(); dynamoDBLeaseManager.deleteAll();
} }
} }
@ -524,7 +524,7 @@ public class ShardSyncerTest {
ExceptionThrowingLeaseManagerMethods.LISTLEASES, ExceptionThrowingLeaseManagerMethods.LISTLEASES,
c, INITIAL_POSITION_AT_TIMESTAMP); c, INITIAL_POSITION_AT_TIMESTAMP);
// Need to clean up lease manager every time after calling ShardSyncer // Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll(); dynamoDBLeaseManager.deleteAll();
} }
} }
@ -547,7 +547,7 @@ public class ShardSyncerTest {
ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS,
c, INITIAL_POSITION_AT_TIMESTAMP); c, INITIAL_POSITION_AT_TIMESTAMP);
// Need to clean up lease manager every time after calling ShardSyncer // Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll(); dynamoDBLeaseManager.deleteAll();
} }
} }
@ -566,7 +566,7 @@ public class ShardSyncerTest {
retryCheckAndCreateLeaseForNewShards(exceptionMethod, exceptionTime, position); retryCheckAndCreateLeaseForNewShards(exceptionMethod, exceptionTime, position);
List<KinesisClientLease> newLeases = leaseManager.listLeases(); List<KinesisClientLease> newLeases = dynamoDBLeaseManager.listLeases();
Map<String, ExtendedSequenceNumber> expectedShardIdToCheckpointMap = new HashMap<>(); Map<String, ExtendedSequenceNumber> expectedShardIdToCheckpointMap = new HashMap<>();
for (int i = 0; i < 11; i++) { for (int i = 0; i < 11; i++) {
expectedShardIdToCheckpointMap.put("shardId-" + i, extendedSequenceNumber); expectedShardIdToCheckpointMap.put("shardId-" + i, extendedSequenceNumber);
@ -578,18 +578,18 @@ public class ShardSyncerTest {
assertEquals(expectedCheckpoint, lease1.getCheckpoint()); assertEquals(expectedCheckpoint, lease1.getCheckpoint());
} }
KinesisClientLease closedShardLease = leaseManager.getLease("shardId-0"); KinesisClientLease closedShardLease = dynamoDBLeaseManager.getLease("shardId-0");
closedShardLease.setCheckpoint(ExtendedSequenceNumber.SHARD_END); closedShardLease.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
leaseManager.updateLease(closedShardLease); dynamoDBLeaseManager.updateLease(closedShardLease);
expectedShardIdToCheckpointMap.remove(closedShardLease.getLeaseKey()); expectedShardIdToCheckpointMap.remove(closedShardLease.getLeaseKey());
KinesisClientLease childShardLease = leaseManager.getLease("shardId-6"); KinesisClientLease childShardLease = dynamoDBLeaseManager.getLease("shardId-6");
childShardLease.setCheckpoint(new ExtendedSequenceNumber("34290")); childShardLease.setCheckpoint(new ExtendedSequenceNumber("34290"));
leaseManager.updateLease(childShardLease); dynamoDBLeaseManager.updateLease(childShardLease);
expectedShardIdToCheckpointMap.put(childShardLease.getLeaseKey(), new ExtendedSequenceNumber("34290")); expectedShardIdToCheckpointMap.put(childShardLease.getLeaseKey(), new ExtendedSequenceNumber("34290"));
retryCheckAndCreateLeaseForNewShards(exceptionMethod, exceptionTime, position); retryCheckAndCreateLeaseForNewShards(exceptionMethod, exceptionTime, position);
newLeases = leaseManager.listLeases(); newLeases = dynamoDBLeaseManager.listLeases();
assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size()); assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size());
for (KinesisClientLease lease1 : newLeases) { for (KinesisClientLease lease1 : newLeases) {
ExtendedSequenceNumber expectedCheckpoint = expectedShardIdToCheckpointMap.get(lease1.getLeaseKey()); ExtendedSequenceNumber expectedCheckpoint = expectedShardIdToCheckpointMap.get(lease1.getLeaseKey());
@ -617,10 +617,10 @@ public class ShardSyncerTest {
null, null,
ShardObjectHelper.newSequenceNumberRange("101", null))); ShardObjectHelper.newSequenceNumberRange("101", null)));
garbageLease.setCheckpoint(new ExtendedSequenceNumber("999")); garbageLease.setCheckpoint(new ExtendedSequenceNumber("999"));
leaseManager.createLeaseIfNotExists(garbageLease); dynamoDBLeaseManager.createLeaseIfNotExists(garbageLease);
assertEquals(garbageShardId, leaseManager.getLease(garbageShardId).getLeaseKey()); assertEquals(garbageShardId, dynamoDBLeaseManager.getLease(garbageShardId).getLeaseKey());
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST); testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST);
assertNull(leaseManager.getLease(garbageShardId)); assertNull(dynamoDBLeaseManager.getLease(garbageShardId));
} }
private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition) private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition)
@ -636,9 +636,9 @@ public class ShardSyncerTest {
when(leaseManagerProxy.listShards()).thenReturn(shards); when(leaseManagerProxy.listShards()).thenReturn(shards);
ShardSyncer.bootstrapShardLeases(leaseManagerProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, ShardSyncer.bootstrapShardLeases(leaseManagerProxy, dynamoDBLeaseManager, initialPosition, cleanupLeasesOfCompletedShards,
false); false);
List<KinesisClientLease> newLeases = leaseManager.listLeases(); List<KinesisClientLease> newLeases = dynamoDBLeaseManager.listLeases();
assertEquals(2, newLeases.size()); assertEquals(2, newLeases.size());
Set<String> expectedLeaseShardIds = new HashSet<String>(); Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add(shardId0); expectedLeaseShardIds.add(shardId0);
@ -1446,7 +1446,7 @@ public class ShardSyncerTest {
String closedShardId = "shardId-2"; String closedShardId = "shardId-2";
KinesisClientLease leaseForClosedShard = newLease(closedShardId); KinesisClientLease leaseForClosedShard = newLease(closedShardId);
leaseForClosedShard.setCheckpoint(new ExtendedSequenceNumber("1234")); leaseForClosedShard.setCheckpoint(new ExtendedSequenceNumber("1234"));
leaseManager.createLeaseIfNotExists(leaseForClosedShard); dynamoDBLeaseManager.createLeaseIfNotExists(leaseForClosedShard);
Set<String> childShardIds = new HashSet<>(); Set<String> childShardIds = new HashSet<>();
List<KinesisClientLease> trackedLeases = new ArrayList<>(); List<KinesisClientLease> trackedLeases = new ArrayList<>();
@ -1463,49 +1463,49 @@ public class ShardSyncerTest {
Map<String, KinesisClientLease> trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); Map<String, KinesisClientLease> trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
// empty list of leases // empty list of leases
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager);
assertNotNull(leaseManager.getLease(closedShardId)); assertNotNull(dynamoDBLeaseManager.getLease(closedShardId));
// closed shard has not been fully processed yet (checkpoint != SHARD_END) // closed shard has not been fully processed yet (checkpoint != SHARD_END)
trackedLeases.add(leaseForClosedShard); trackedLeases.add(leaseForClosedShard);
trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager);
assertNotNull(leaseManager.getLease(closedShardId)); assertNotNull(dynamoDBLeaseManager.getLease(closedShardId));
// closed shard has been fully processed yet (checkpoint == SHARD_END) // closed shard has been fully processed yet (checkpoint == SHARD_END)
leaseForClosedShard.setCheckpoint(ExtendedSequenceNumber.SHARD_END); leaseForClosedShard.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
leaseManager.updateLease(leaseForClosedShard); dynamoDBLeaseManager.updateLease(leaseForClosedShard);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager);
assertNull(leaseManager.getLease(closedShardId)); assertNull(dynamoDBLeaseManager.getLease(closedShardId));
// lease for only one child exists // lease for only one child exists
childShardIds.add(childShardId1); childShardIds.add(childShardId1);
childShardIds.add(childShardId2); childShardIds.add(childShardId2);
leaseManager.createLeaseIfNotExists(leaseForClosedShard); dynamoDBLeaseManager.createLeaseIfNotExists(leaseForClosedShard);
leaseManager.createLeaseIfNotExists(childLease1); dynamoDBLeaseManager.createLeaseIfNotExists(childLease1);
trackedLeases.add(childLease1); trackedLeases.add(childLease1);
trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager);
assertNotNull(leaseManager.getLease(closedShardId)); assertNotNull(dynamoDBLeaseManager.getLease(closedShardId));
// leases for both children exists, but they are both at TRIM_HORIZON // leases for both children exists, but they are both at TRIM_HORIZON
leaseManager.createLeaseIfNotExists(childLease2); dynamoDBLeaseManager.createLeaseIfNotExists(childLease2);
trackedLeases.add(childLease2); trackedLeases.add(childLease2);
trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager);
assertNotNull(leaseManager.getLease(closedShardId)); assertNotNull(dynamoDBLeaseManager.getLease(closedShardId));
// leases for both children exists, one is at TRIM_HORIZON // leases for both children exists, one is at TRIM_HORIZON
childLease1.setCheckpoint(new ExtendedSequenceNumber("34890")); childLease1.setCheckpoint(new ExtendedSequenceNumber("34890"));
leaseManager.updateLease(childLease1); dynamoDBLeaseManager.updateLease(childLease1);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager);
assertNotNull(leaseManager.getLease(closedShardId)); assertNotNull(dynamoDBLeaseManager.getLease(closedShardId));
// leases for both children exists, NONE of them are at TRIM_HORIZON // leases for both children exists, NONE of them are at TRIM_HORIZON
childLease2.setCheckpoint(new ExtendedSequenceNumber("43789")); childLease2.setCheckpoint(new ExtendedSequenceNumber("43789"));
leaseManager.updateLease(childLease2); dynamoDBLeaseManager.updateLease(childLease2);
ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager);
assertNull(leaseManager.getLease(closedShardId)); assertNull(dynamoDBLeaseManager.getLease(closedShardId));
} }
/** /**

View file

@ -33,7 +33,7 @@ public class TestHarnessBuilder {
private long currentTimeNanos; private long currentTimeNanos;
private Map<String, KinesisClientLease> leases = new HashMap<String, KinesisClientLease>(); private Map<String, KinesisClientLease> leases = new HashMap<String, KinesisClientLease>();
private KinesisClientLeaseManager leaseManager; private KinesisClientDynamoDBLeaseManager leaseManager;
private Map<String, KinesisClientLease> originalLeases = new HashMap<>(); private Map<String, KinesisClientLease> originalLeases = new HashMap<>();
private Callable<Long> timeProvider = new Callable<Long>() { private Callable<Long> timeProvider = new Callable<Long>() {
@ -45,7 +45,7 @@ public class TestHarnessBuilder {
}; };
public TestHarnessBuilder(KinesisClientLeaseManager leaseManager) { public TestHarnessBuilder(KinesisClientDynamoDBLeaseManager leaseManager) {
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
} }
@ -134,7 +134,7 @@ public class TestHarnessBuilder {
Assert.assertEquals(original, actual); // Assert the contents of the lease Assert.assertEquals(original, actual); // Assert the contents of the lease
} }
public void addLeasesToRenew(ILeaseRenewer<KinesisClientLease> renewer, String... shardIds) public void addLeasesToRenew(LeaseRenewer<KinesisClientLease> renewer, String... shardIds)
throws DependencyException, InvalidStateException { throws DependencyException, InvalidStateException {
List<KinesisClientLease> leasesToRenew = new ArrayList<KinesisClientLease>(); List<KinesisClientLease> leasesToRenew = new ArrayList<KinesisClientLease>();
@ -147,7 +147,7 @@ public class TestHarnessBuilder {
renewer.addLeasesToRenew(leasesToRenew); renewer.addLeasesToRenew(leasesToRenew);
} }
public Map<String, KinesisClientLease> renewMutateAssert(ILeaseRenewer<KinesisClientLease> renewer, String... renewedShardIds) public Map<String, KinesisClientLease> renewMutateAssert(LeaseRenewer<KinesisClientLease> renewer, String... renewedShardIds)
throws DependencyException, InvalidStateException { throws DependencyException, InvalidStateException {
renewer.renewLeases(); renewer.renewLeases();

View file

@ -26,7 +26,7 @@ import java.util.List;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
@ -58,7 +58,7 @@ public class BlockOnParentShardTaskTest {
@Test @Test
public final void testCallNoParents() public final void testCallNoParents()
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class); LeaseManager<KinesisClientLease> leaseManager = mock(LeaseManager.class);
when(leaseManager.getLease(shardId)).thenReturn(null); when(leaseManager.getLease(shardId)).thenReturn(null);
BlockOnParentShardTask task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); BlockOnParentShardTask task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis);
@ -88,7 +88,7 @@ public class BlockOnParentShardTaskTest {
KinesisClientLease parent2Lease = new KinesisClientLease(); KinesisClientLease parent2Lease = new KinesisClientLease();
parent2Lease.setCheckpoint(ExtendedSequenceNumber.SHARD_END); parent2Lease.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class); LeaseManager<KinesisClientLease> leaseManager = mock(LeaseManager.class);
when(leaseManager.getLease(parent1ShardId)).thenReturn(parent1Lease); when(leaseManager.getLease(parent1ShardId)).thenReturn(parent1Lease);
when(leaseManager.getLease(parent2ShardId)).thenReturn(parent2Lease); when(leaseManager.getLease(parent2ShardId)).thenReturn(parent2Lease);
@ -130,7 +130,7 @@ public class BlockOnParentShardTaskTest {
// mock a sequence number checkpoint // mock a sequence number checkpoint
parent2Lease.setCheckpoint(new ExtendedSequenceNumber("98182584034")); parent2Lease.setCheckpoint(new ExtendedSequenceNumber("98182584034"));
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class); LeaseManager<KinesisClientLease> leaseManager = mock(LeaseManager.class);
when(leaseManager.getLease(parent1ShardId)).thenReturn(parent1Lease); when(leaseManager.getLease(parent1ShardId)).thenReturn(parent1Lease);
when(leaseManager.getLease(parent2ShardId)).thenReturn(parent2Lease); when(leaseManager.getLease(parent2ShardId)).thenReturn(parent2Lease);
@ -166,7 +166,7 @@ public class BlockOnParentShardTaskTest {
ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON);
TaskResult result = null; TaskResult result = null;
KinesisClientLease parentLease = new KinesisClientLease(); KinesisClientLease parentLease = new KinesisClientLease();
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class); LeaseManager<KinesisClientLease> leaseManager = mock(LeaseManager.class);
when(leaseManager.getLease(parentShardId)).thenReturn(parentLease); when(leaseManager.getLease(parentShardId)).thenReturn(parentLease);
// test when parent shard has not yet been fully processed // test when parent shard has not yet been fully processed

View file

@ -45,7 +45,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.LeaseManagerProxy;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -72,7 +72,7 @@ public class ConsumerStatesTest {
@Mock @Mock
private ShardInfo shardInfo; private ShardInfo shardInfo;
@Mock @Mock
private ILeaseManager<KinesisClientLease> leaseManager; private LeaseManager<KinesisClientLease> leaseManager;
@Mock @Mock
private Checkpointer checkpoint; private Checkpointer checkpoint;
@Mock @Mock
@ -113,7 +113,7 @@ public class ConsumerStatesTest {
when(shardInfo.shardId()).thenReturn("shardId-000000000000"); when(shardInfo.shardId()).thenReturn("shardId-000000000000");
} }
private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class; private static final Class<LeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<LeaseManager<KinesisClientLease>>) (Class<?>) LeaseManager.class;
@Test @Test
public void blockOnParentStateTest() { public void blockOnParentStateTest() {

View file

@ -75,7 +75,7 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.LeaseManagerProxy;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -139,7 +139,7 @@ public class ShardConsumerTest {
@Mock @Mock
private KinesisClientLibConfiguration config; private KinesisClientLibConfiguration config;
@Mock @Mock
private ILeaseManager<KinesisClientLease> leaseManager; private LeaseManager<KinesisClientLease> leaseManager;
@Mock @Mock
private Checkpointer checkpoint; private Checkpointer checkpoint;
@Mock @Mock

View file

@ -34,7 +34,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.LeaseManager;
import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.LeaseManagerProxy;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -66,7 +66,7 @@ public class ShutdownTaskTest {
@Mock @Mock
private RecordProcessorCheckpointer checkpointer; private RecordProcessorCheckpointer checkpointer;
@Mock @Mock
private ILeaseManager<KinesisClientLease> leaseManager; private LeaseManager<KinesisClientLease> leaseManager;
@Mock @Mock
private LeaseManagerProxy leaseManagerProxy; private LeaseManagerProxy leaseManagerProxy;

View file

@ -20,6 +20,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -33,6 +34,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
@ -87,6 +90,11 @@ public class PrefetchGetRecordsCacheIntegrationTest {
dataFetcher = spy(new KinesisDataFetcherForTest(amazonKinesis, streamName, shardId, MAX_RECORDS_PER_CALL)); dataFetcher = spy(new KinesisDataFetcherForTest(amazonKinesis, streamName, shardId, MAX_RECORDS_PER_CALL));
getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
executorService = spy(Executors.newFixedThreadPool(1)); executorService = spy(Executors.newFixedThreadPool(1));
when(extendedSequenceNumber.getSequenceNumber()).thenReturn("LATEST");
when(amazonKinesis.getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(new GetShardIteratorResult().withShardIterator("TestIterator"));
getRecordsCache = new PrefetchGetRecordsCache(MAX_SIZE, getRecordsCache = new PrefetchGetRecordsCache(MAX_SIZE,
MAX_BYTE_SIZE, MAX_BYTE_SIZE,
MAX_RECORDS_COUNT, MAX_RECORDS_COUNT,