diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java index 5fda7cf5..f4457b65 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java @@ -20,11 +20,6 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; -import software.amazon.kinesis.leases.ILeaseManager; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.KinesisClientLeaseManager; -import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; -import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.metrics.IMetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -55,8 +50,6 @@ public class CheckpointConfig { private long failoverTimeMillis = 10000L; - private ILeaseManager leaseManager; - private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; @@ -69,33 +62,10 @@ public class CheckpointConfig { private long epsilonMillis = 25L; - private LeaseCoordinator leaseCoordinator; - - public ILeaseManager leaseManager() { - if (leaseManager == null) { - leaseManager = new KinesisClientLeaseManager(tableName, amazonDynamoDB, consistentReads); - } - return leaseManager; - } - public CheckpointFactory checkpointFactory() { if (checkpointFactory == null) { - checkpointFactory = new DynamoDBCheckpointFactory(leaseCoordinator(), leaseManager(), metricsFactory()); + checkpointFactory = new DynamoDBCheckpointFactory(metricsFactory()); } return checkpointFactory; } - - public LeaseCoordinator leaseCoordinator() { - if (leaseCoordinator == null) { - leaseCoordinator = new KinesisClientLibLeaseCoordinator(leaseManager(), - workerIdentifier(), - failoverTimeMillis(), - epsilonMillis(), - maxLeasesForWorker(), - maxLeasesToStealAtOneTime(), - maxLeaseRenewalThreads(), - metricsFactory()); - } - return leaseCoordinator; - } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java index 2d8c86d2..27205738 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java @@ -15,9 +15,8 @@ package software.amazon.kinesis.checkpoint; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.processor.Checkpointer; @@ -26,5 +25,5 @@ import software.amazon.kinesis.processor.Checkpointer; */ public interface CheckpointFactory { Checkpointer createCheckpointer(LeaseCoordinator leaseCoordinator, - ILeaseManager leaseManager); + LeaseManager leaseManager); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointFactory.java index 9924d0ac..4a05d141 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointFactory.java @@ -17,7 +17,7 @@ package software.amazon.kinesis.checkpoint; import lombok.Data; import lombok.NonNull; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.metrics.IMetricsFactory; @@ -28,15 +28,12 @@ import software.amazon.kinesis.processor.Checkpointer; */ @Data public class DynamoDBCheckpointFactory implements CheckpointFactory { - @NonNull - private final LeaseCoordinator leaseLeaseCoordinator; - @NonNull - private final ILeaseManager leaseManager; @NonNull private final IMetricsFactory metricsFactory; @Override - public Checkpointer createCheckpoint() { + public Checkpointer createCheckpointer(final LeaseCoordinator leaseLeaseCoordinator, + final LeaseManager leaseManager) { return new DynamoDBCheckpointer(leaseLeaseCoordinator, leaseManager, metricsFactory); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointer.java index 59c0a21a..2c614a82 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointer.java @@ -28,7 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -47,7 +47,7 @@ public class DynamoDBCheckpointer implements Checkpointer { @NonNull private final LeaseCoordinator leaseCoordinator; @NonNull - private final ILeaseManager leaseManager; + private final LeaseManager leaseManager; @NonNull private final IMetricsFactory metricsFactory; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index cf11395e..4167cba2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -38,7 +38,7 @@ import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.checkpoint.CheckpointConfig; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -108,7 +108,7 @@ public class Scheduler implements Runnable { private final String streamName; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; - private final ILeaseManager leaseManager; + private final LeaseManager leaseManager; private final LeaseManagerProxy leaseManagerProxy; private final boolean ignoreUnexpetedChildShards; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseManagementFactory.java index 14a06b4f..0518fd14 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseManagementFactory.java @@ -76,8 +76,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { } @Override - public LeaseManager createLeaseManager() { - return new KinesisClientLeaseManager(tableName, amazonDynamoDB, consistentReads); + public DynamoDBLeaseManager createLeaseManager() { + return new KinesisClientDynamoDBLeaseManager(tableName, amazonDynamoDB, consistentReads); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseManager.java new file mode 100644 index 00000000..49dec77f --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseManager.java @@ -0,0 +1,593 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.leases; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; +import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest; +import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; +import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; +import com.amazonaws.services.dynamodbv2.model.GetItemRequest; +import com.amazonaws.services.dynamodbv2.model.GetItemResult; +import com.amazonaws.services.dynamodbv2.model.LimitExceededException; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.dynamodbv2.model.PutItemRequest; +import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; +import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; +import com.amazonaws.services.dynamodbv2.model.ScanRequest; +import com.amazonaws.services.dynamodbv2.model.ScanResult; +import com.amazonaws.services.dynamodbv2.model.TableStatus; +import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; + +import lombok.extern.slf4j.Slf4j; + +/** + * An implementation of ILeaseManager that uses DynamoDB. + */ +@Slf4j +public class DynamoDBLeaseManager implements LeaseManager { + protected String table; + protected AmazonDynamoDB dynamoDBClient; + protected LeaseSerializer serializer; + protected boolean consistentReads; + + /** + * Constructor. + * + * @param table leases table + * @param dynamoDBClient DynamoDB client to use + * @param serializer LeaseSerializer to use to convert to/from DynamoDB objects. + */ + public DynamoDBLeaseManager(String table, AmazonDynamoDB dynamoDBClient, LeaseSerializer serializer) { + this(table, dynamoDBClient, serializer, false); + } + + /** + * Constructor for test cases - allows control of consistent reads. Consistent reads should only be used for testing + * - our code is meant to be resilient to inconsistent reads. Using consistent reads during testing speeds up + * execution of simple tests (you don't have to wait out the consistency window). Test cases that want to experience + * eventual consistency should not set consistentReads=true. + * + * @param table leases table + * @param dynamoDBClient DynamoDB client to use + * @param serializer lease serializer to use + * @param consistentReads true if we want consistent reads for testing purposes. + */ + public DynamoDBLeaseManager(String table, AmazonDynamoDB dynamoDBClient, LeaseSerializer serializer, boolean consistentReads) { + verifyNotNull(table, "Table name cannot be null"); + verifyNotNull(dynamoDBClient, "dynamoDBClient cannot be null"); + verifyNotNull(serializer, "ILeaseSerializer cannot be null"); + + this.table = table; + this.dynamoDBClient = dynamoDBClient; + this.consistentReads = consistentReads; + this.serializer = serializer; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) + throws ProvisionedThroughputException, DependencyException { + verifyNotNull(readCapacity, "readCapacity cannot be null"); + verifyNotNull(writeCapacity, "writeCapacity cannot be null"); + + try { + if (tableStatus() != null) { + return false; + } + } catch (DependencyException de) { + // + // Something went wrong with DynamoDB + // + log.error("Failed to get table status for {}", table, de); + } + CreateTableRequest request = new CreateTableRequest(); + request.setTableName(table); + request.setKeySchema(serializer.getKeySchema()); + request.setAttributeDefinitions(serializer.getAttributeDefinitions()); + + ProvisionedThroughput throughput = new ProvisionedThroughput(); + throughput.setReadCapacityUnits(readCapacity); + throughput.setWriteCapacityUnits(writeCapacity); + request.setProvisionedThroughput(throughput); + + try { + dynamoDBClient.createTable(request); + } catch (ResourceInUseException e) { + log.info("Table {} already exists.", table); + return false; + } catch (LimitExceededException e) { + throw new ProvisionedThroughputException("Capacity exceeded when creating table " + table, e); + } catch (AmazonClientException e) { + throw new DependencyException(e); + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean leaseTableExists() throws DependencyException { + return TableStatus.ACTIVE == tableStatus(); + } + + private TableStatus tableStatus() throws DependencyException { + DescribeTableRequest request = new DescribeTableRequest(); + + request.setTableName(table); + + DescribeTableResult result; + try { + result = dynamoDBClient.describeTable(request); + } catch (ResourceNotFoundException e) { + if (log.isDebugEnabled()) { + log.debug("Got ResourceNotFoundException for table {} in leaseTableExists, returning false.", table); + } + return null; + } catch (AmazonClientException e) { + throw new DependencyException(e); + } + + TableStatus tableStatus = TableStatus.fromValue(result.getTable().getTableStatus()); + if (log.isDebugEnabled()) { + log.debug("Lease table exists and is in status {}", tableStatus); + } + + return tableStatus; + } + + @Override + public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException { + long sleepTimeRemaining = TimeUnit.SECONDS.toMillis(timeoutSeconds); + + while (!leaseTableExists()) { + if (sleepTimeRemaining <= 0) { + return false; + } + + long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining); + + sleepTimeRemaining -= sleep(timeToSleepMillis); + } + + return true; + } + + /** + * Exposed for testing purposes. + * + * @param timeToSleepMillis time to sleep in milliseconds + * + * @return actual time slept in millis + */ + long sleep(long timeToSleepMillis) { + long startTime = System.currentTimeMillis(); + + try { + Thread.sleep(timeToSleepMillis); + } catch (InterruptedException e) { + log.debug("Interrupted while sleeping"); + } + + return System.currentTimeMillis() - startTime; + } + + /** + * {@inheritDoc} + */ + @Override + public List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + return list(null); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + return list(1).isEmpty(); + } + + /** + * List with the given page size. Package access for integration testing. + * + * @param limit number of items to consider at a time - used by integration tests to force paging. + * @return list of leases + * @throws InvalidStateException if table does not exist + * @throws DependencyException if DynamoDB scan fail in an unexpected way + * @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity + */ + List list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + if (log.isDebugEnabled()) { + log.debug("Listing leases from table {}", table); + } + + ScanRequest scanRequest = new ScanRequest(); + scanRequest.setTableName(table); + if (limit != null) { + scanRequest.setLimit(limit); + } + + try { + ScanResult scanResult = dynamoDBClient.scan(scanRequest); + List result = new ArrayList(); + + while (scanResult != null) { + for (Map item : scanResult.getItems()) { + if (log.isDebugEnabled()) { + log.debug("Got item {} from DynamoDB.", item.toString()); + } + + result.add(serializer.fromDynamoRecord(item)); + } + + Map lastEvaluatedKey = scanResult.getLastEvaluatedKey(); + if (lastEvaluatedKey == null) { + // Signify that we're done. + scanResult = null; + if (log.isDebugEnabled()) { + log.debug("lastEvaluatedKey was null - scan finished."); + } + } else { + // Make another request, picking up where we left off. + scanRequest.setExclusiveStartKey(lastEvaluatedKey); + + if (log.isDebugEnabled()) { + log.debug("lastEvaluatedKey was {}, continuing scan.", lastEvaluatedKey); + } + + scanResult = dynamoDBClient.scan(scanRequest); + } + } + + if (log.isDebugEnabled()) { + log.debug("Listed {} leases from table {}", result.size(), table); + } + + return result; + } catch (ResourceNotFoundException e) { + throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e); + } catch (ProvisionedThroughputExceededException e) { + throw new ProvisionedThroughputException(e); + } catch (AmazonClientException e) { + throw new DependencyException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean createLeaseIfNotExists(T lease) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + verifyNotNull(lease, "lease cannot be null"); + + if (log.isDebugEnabled()) { + log.debug("Creating lease {}", lease); + } + + PutItemRequest request = new PutItemRequest(); + request.setTableName(table); + request.setItem(serializer.toDynamoRecord(lease)); + request.setExpected(serializer.getDynamoNonexistantExpectation()); + + try { + dynamoDBClient.putItem(request); + } catch (ConditionalCheckFailedException e) { + if (log.isDebugEnabled()) { + log.debug("Did not create lease {} because it already existed", lease); + } + + return false; + } catch (AmazonClientException e) { + throw convertAndRethrowExceptions("create", lease.getLeaseKey(), e); + } + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public T getLease(String leaseKey) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + verifyNotNull(leaseKey, "leaseKey cannot be null"); + + if (log.isDebugEnabled()) { + log.debug("Getting lease with key {}", leaseKey); + } + + GetItemRequest request = new GetItemRequest(); + request.setTableName(table); + request.setKey(serializer.getDynamoHashKey(leaseKey)); + request.setConsistentRead(consistentReads); + + try { + GetItemResult result = dynamoDBClient.getItem(request); + + Map dynamoRecord = result.getItem(); + if (dynamoRecord == null) { + if (log.isDebugEnabled()) { + log.debug("No lease found with key {}, returning null.", leaseKey); + } + + return null; + } else { + T lease = serializer.fromDynamoRecord(dynamoRecord); + if (log.isDebugEnabled()) { + log.debug("Got lease {}", lease); + } + + return lease; + } + } catch (AmazonClientException e) { + throw convertAndRethrowExceptions("get", leaseKey, e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean renewLease(T lease) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + verifyNotNull(lease, "lease cannot be null"); + + if (log.isDebugEnabled()) { + log.debug("Renewing lease with key {}", lease.getLeaseKey()); + } + + UpdateItemRequest request = new UpdateItemRequest(); + request.setTableName(table); + request.setKey(serializer.getDynamoHashKey(lease)); + request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); + request.setAttributeUpdates(serializer.getDynamoLeaseCounterUpdate(lease)); + + try { + dynamoDBClient.updateItem(request); + } catch (ConditionalCheckFailedException e) { + if (log.isDebugEnabled()) { + log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}", + lease.getLeaseKey(), lease.getLeaseCounter()); + } + + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease counter + // are what we expected. + String expectedOwner = lease.getLeaseOwner(); + Long expectedCounter = lease.getLeaseCounter() + 1; + T updatedLease = getLease(lease.getLeaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) || + !expectedCounter.equals(updatedLease.getLeaseCounter())) { + return false; + } + + log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.getLeaseKey()); + } catch (AmazonClientException e) { + throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e); + } + + lease.setLeaseCounter(lease.getLeaseCounter() + 1); + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean takeLease(T lease, String owner) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + verifyNotNull(lease, "lease cannot be null"); + verifyNotNull(owner, "owner cannot be null"); + + if (log.isDebugEnabled()) { + log.debug("Taking lease with leaseKey {} from {} to {}", + lease.getLeaseKey(), + lease.getLeaseOwner() == null ? "nobody" : lease.getLeaseOwner(), + owner); + } + + UpdateItemRequest request = new UpdateItemRequest(); + request.setTableName(table); + request.setKey(serializer.getDynamoHashKey(lease)); + request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); + + Map updates = serializer.getDynamoLeaseCounterUpdate(lease); + updates.putAll(serializer.getDynamoTakeLeaseUpdate(lease, owner)); + request.setAttributeUpdates(updates); + + try { + dynamoDBClient.updateItem(request); + } catch (ConditionalCheckFailedException e) { + if (log.isDebugEnabled()) { + log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}", + lease.getLeaseKey(), lease.getLeaseCounter()); + } + + return false; + } catch (AmazonClientException e) { + throw convertAndRethrowExceptions("take", lease.getLeaseKey(), e); + } + + lease.setLeaseCounter(lease.getLeaseCounter() + 1); + lease.setLeaseOwner(owner); + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean evictLease(T lease) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + verifyNotNull(lease, "lease cannot be null"); + + if (log.isDebugEnabled()) { + log.debug("Evicting lease with leaseKey {} owned by {}", + lease.getLeaseKey(), + lease.getLeaseOwner()); + } + + UpdateItemRequest request = new UpdateItemRequest(); + request.setTableName(table); + request.setKey(serializer.getDynamoHashKey(lease)); + request.setExpected(serializer.getDynamoLeaseOwnerExpectation(lease)); + + Map updates = serializer.getDynamoLeaseCounterUpdate(lease); + updates.putAll(serializer.getDynamoEvictLeaseUpdate(lease)); + request.setAttributeUpdates(updates); + + try { + dynamoDBClient.updateItem(request); + } catch (ConditionalCheckFailedException e) { + if (log.isDebugEnabled()) { + log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}", + lease.getLeaseKey(), lease.getLeaseOwner()); + } + + return false; + } catch (AmazonClientException e) { + throw convertAndRethrowExceptions("evict", lease.getLeaseKey(), e); + } + + lease.setLeaseOwner(null); + lease.setLeaseCounter(lease.getLeaseCounter() + 1); + return true; + } + + /** + * {@inheritDoc} + */ + public void deleteAll() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + List allLeases = listLeases(); + + log.warn("Deleting {} items from table {}", allLeases.size(), table); + + for (T lease : allLeases) { + DeleteItemRequest deleteRequest = new DeleteItemRequest(); + deleteRequest.setTableName(table); + deleteRequest.setKey(serializer.getDynamoHashKey(lease)); + + dynamoDBClient.deleteItem(deleteRequest); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void deleteLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + verifyNotNull(lease, "lease cannot be null"); + + if (log.isDebugEnabled()) { + log.debug("Deleting lease with leaseKey {}", lease.getLeaseKey()); + } + + DeleteItemRequest deleteRequest = new DeleteItemRequest(); + deleteRequest.setTableName(table); + deleteRequest.setKey(serializer.getDynamoHashKey(lease)); + + try { + dynamoDBClient.deleteItem(deleteRequest); + } catch (AmazonClientException e) { + throw convertAndRethrowExceptions("delete", lease.getLeaseKey(), e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean updateLease(T lease) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + verifyNotNull(lease, "lease cannot be null"); + + if (log.isDebugEnabled()) { + log.debug("Updating lease {}", lease); + } + + UpdateItemRequest request = new UpdateItemRequest(); + request.setTableName(table); + request.setKey(serializer.getDynamoHashKey(lease)); + request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); + + Map updates = serializer.getDynamoLeaseCounterUpdate(lease); + updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); + request.setAttributeUpdates(updates); + + try { + dynamoDBClient.updateItem(request); + } catch (ConditionalCheckFailedException e) { + if (log.isDebugEnabled()) { + log.debug("Lease update failed for lease with key {} because the lease counter was not {}", + lease.getLeaseKey(), lease.getLeaseCounter()); + } + + return false; + } catch (AmazonClientException e) { + throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e); + } + + lease.setLeaseCounter(lease.getLeaseCounter() + 1); + return true; + } + + /* + * This method contains boilerplate exception handling - it throws or returns something to be thrown. The + * inconsistency there exists to satisfy the compiler when this method is used at the end of non-void methods. + */ + protected DependencyException convertAndRethrowExceptions(String operation, String leaseKey, AmazonClientException e) + throws ProvisionedThroughputException, InvalidStateException { + if (e instanceof ProvisionedThroughputExceededException) { + log.warn("Provisioned Throughput on the lease table has been exceeded. It's recommended that you increase the IOPs on the table. Failure to increase the IOPs may cause the application to not make progress."); + throw new ProvisionedThroughputException(e); + } else if (e instanceof ResourceNotFoundException) { + // @formatter:on + throw new InvalidStateException(String.format("Cannot %s lease with key %s because table %s does not exist.", + operation, + leaseKey, + table), + e); + //@formatter:off + } else { + return new DependencyException(e); + } + } + + private void verifyNotNull(Object object, String message) { + if (object == null) { + throw new IllegalArgumentException(message); + } + } + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewer.java index 330ec30e..6ebd39a9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewer.java @@ -44,10 +44,10 @@ import lombok.extern.slf4j.Slf4j; * An implementation of ILeaseRenewer that uses DynamoDB via LeaseManager. */ @Slf4j -public class LeaseRenewer implements ILeaseRenewer { +public class DynamoDBLeaseRenewer implements LeaseRenewer { private static final int RENEWAL_RETRIES = 2; - private final ILeaseManager leaseManager; + private final LeaseManager leaseManager; private final ConcurrentNavigableMap ownedLeases = new ConcurrentSkipListMap(); private final String workerIdentifier; private final long leaseDurationNanos; @@ -61,8 +61,8 @@ public class LeaseRenewer implements ILeaseRenewer { * @param leaseDurationMillis duration of a lease in milliseconds * @param executorService ExecutorService to use for renewing leases in parallel */ - public LeaseRenewer(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis, - ExecutorService executorService) { + public DynamoDBLeaseRenewer(LeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis, + ExecutorService executorService) { this.leaseManager = leaseManager; this.workerIdentifier = workerIdentifier; this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseSerializer.java new file mode 100644 index 00000000..f832ab82 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseSerializer.java @@ -0,0 +1,194 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.leases; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.amazonaws.services.dynamodbv2.model.AttributeAction; +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; +import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; + +/** + * An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that + * LeaseSerializer can be decorated by other classes if you need to add fields to leases. + */ +public class DynamoDBLeaseSerializer implements LeaseSerializer { + + public final String LEASE_KEY_KEY = "leaseKey"; + public final String LEASE_OWNER_KEY = "leaseOwner"; + public final String LEASE_COUNTER_KEY = "leaseCounter"; + public final Class clazz; + + public DynamoDBLeaseSerializer() { + this.clazz = Lease.class; + } + + public DynamoDBLeaseSerializer(Class clazz) { + this.clazz = clazz; + } + + @Override + public Map toDynamoRecord(Lease lease) { + Map result = new HashMap(); + + result.put(LEASE_KEY_KEY, DynamoUtils.createAttributeValue(lease.getLeaseKey())); + result.put(LEASE_COUNTER_KEY, DynamoUtils.createAttributeValue(lease.getLeaseCounter())); + + if (lease.getLeaseOwner() != null) { + result.put(LEASE_OWNER_KEY, DynamoUtils.createAttributeValue(lease.getLeaseOwner())); + } + + return result; + } + + @Override + public Lease fromDynamoRecord(Map dynamoRecord) { + Lease result; + try { + result = clazz.newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + result.setLeaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY)); + result.setLeaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY)); + result.setLeaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY)); + + return result; + } + + @Override + public Map getDynamoHashKey(String leaseKey) { + Map result = new HashMap(); + + result.put(LEASE_KEY_KEY, DynamoUtils.createAttributeValue(leaseKey)); + + return result; + } + + @Override + public Map getDynamoHashKey(Lease lease) { + return getDynamoHashKey(lease.getLeaseKey()); + } + + @Override + public Map getDynamoLeaseCounterExpectation(Lease lease) { + return getDynamoLeaseCounterExpectation(lease.getLeaseCounter()); + } + + public Map getDynamoLeaseCounterExpectation(Long leaseCounter) { + Map result = new HashMap(); + + ExpectedAttributeValue eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(leaseCounter)); + result.put(LEASE_COUNTER_KEY, eav); + + return result; + } + + @Override + public Map getDynamoLeaseOwnerExpectation(Lease lease) { + Map result = new HashMap(); + + ExpectedAttributeValue eav = null; + + if (lease.getLeaseOwner() == null) { + eav = new ExpectedAttributeValue(false); + } else { + eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner())); + } + + result.put(LEASE_OWNER_KEY, eav); + + return result; + } + + @Override + public Map getDynamoNonexistantExpectation() { + Map result = new HashMap(); + + ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(false); + result.put(LEASE_KEY_KEY, expectedAV); + + return result; + } + + @Override + public Map getDynamoLeaseCounterUpdate(Lease lease) { + return getDynamoLeaseCounterUpdate(lease.getLeaseCounter()); + } + + public Map getDynamoLeaseCounterUpdate(Long leaseCounter) { + Map result = new HashMap(); + + AttributeValueUpdate avu = + new AttributeValueUpdate(DynamoUtils.createAttributeValue(leaseCounter + 1), AttributeAction.PUT); + result.put(LEASE_COUNTER_KEY, avu); + + return result; + } + + @Override + public Map getDynamoTakeLeaseUpdate(Lease lease, String owner) { + Map result = new HashMap(); + + result.put(LEASE_OWNER_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(owner), + AttributeAction.PUT)); + + return result; + } + + @Override + public Map getDynamoEvictLeaseUpdate(Lease lease) { + Map result = new HashMap(); + + result.put(LEASE_OWNER_KEY, new AttributeValueUpdate(null, AttributeAction.DELETE)); + + return result; + } + + @Override + public Map getDynamoUpdateLeaseUpdate(Lease lease) { + // There is no application-specific data in Lease - just return a map that increments the counter. + return new HashMap(); + } + + @Override + public Collection getKeySchema() { + List keySchema = new ArrayList(); + keySchema.add(new KeySchemaElement().withAttributeName(LEASE_KEY_KEY).withKeyType(KeyType.HASH)); + + return keySchema; + } + + @Override + public Collection getAttributeDefinitions() { + List definitions = new ArrayList(); + definitions.add(new AttributeDefinition().withAttributeName(LEASE_KEY_KEY) + .withAttributeType(ScalarAttributeType.S)); + + return definitions; + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseTaker.java index 16a07983..138cc79a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoDBLeaseTaker.java @@ -53,7 +53,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { } }; - private final ILeaseManager leaseManager; + private final LeaseManager leaseManager; private final String workerIdentifier; private final Map allLeases = new HashMap(); private final long leaseDurationNanos; @@ -62,7 +62,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { private long lastScanTimeNanos = 0L; - public DynamoDBLeaseTaker(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis) { + public DynamoDBLeaseTaker(LeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis) { this.leaseManager = leaseManager; this.workerIdentifier = workerIdentifier; this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/IKinesisClientLeaseManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/IKinesisClientLeaseManager.java deleted file mode 100644 index 503ef65a..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/IKinesisClientLeaseManager.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.leases; - -import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.kinesis.leases.exceptions.DependencyException; -import software.amazon.kinesis.leases.exceptions.InvalidStateException; -import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; - -/** - * A decoration of ILeaseManager that adds methods to get/update checkpoints. - */ -public interface IKinesisClientLeaseManager extends ILeaseManager { - - /** - * Gets the current checkpoint of the shard. This is useful in the resharding use case - * where we will wait for the parent shard to complete before starting on the records from a child shard. - * - * @param shardId Checkpoint of this shard will be returned - * @return Checkpoint of this shard, or null if the shard record doesn't exist. - * - * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity - * @throws InvalidStateException if lease table does not exist - * @throws DependencyException if DynamoDB update fails in an unexpected way - */ - public abstract ExtendedSequenceNumber getCheckpoint(String shardId) - throws ProvisionedThroughputException, InvalidStateException, DependencyException; - -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ILeaseManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ILeaseManager.java deleted file mode 100644 index 9b5088f3..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ILeaseManager.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.leases; - -import java.util.List; - -import software.amazon.kinesis.leases.exceptions.DependencyException; -import software.amazon.kinesis.leases.exceptions.InvalidStateException; -import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; -import software.amazon.kinesis.leases.Lease; - -/** - * Supports basic CRUD operations for Leases. - * - * @param Lease subclass, possibly Lease itself. - */ -public interface ILeaseManager { - - /** - * Creates the table that will store leases. Succeeds if table already exists. - * - * @param readCapacity - * @param writeCapacity - * - * @return true if we created a new table (table didn't exist before) - * - * @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity - * restrictions. - * @throws DependencyException if DynamoDB createTable fails in an unexpected way - */ - public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) - throws ProvisionedThroughputException, DependencyException; - - /** - * @return true if the lease table already exists. - * - * @throws DependencyException if DynamoDB describeTable fails in an unexpected way - */ - public boolean leaseTableExists() throws DependencyException; - - /** - * Blocks until the lease table exists by polling leaseTableExists. - * - * @param secondsBetweenPolls time to wait between polls in seconds - * @param timeoutSeconds total time to wait in seconds - * - * @return true if table exists, false if timeout was reached - * - * @throws DependencyException if DynamoDB describeTable fails in an unexpected way - */ - public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException; - - /** - * List all objects in table synchronously. - * - * @throws DependencyException if DynamoDB scan fails in an unexpected way - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity - * - * @return list of leases - */ - public List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException; - - /** - * Create a new lease. Conditional on a lease not already existing with this shardId. - * - * @param lease the lease to create - * - * @return true if lease was created, false if lease already exists - * - * @throws DependencyException if DynamoDB put fails in an unexpected way - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB put fails due to lack of capacity - */ - public boolean createLeaseIfNotExists(T lease) - throws DependencyException, InvalidStateException, ProvisionedThroughputException; - - /** - * @param shardId Get the lease for this shardId - * - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity - * @throws DependencyException if DynamoDB get fails in an unexpected way - * - * @return lease for the specified shardId, or null if one doesn't exist - */ - public T getLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException; - - /** - * Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter - * of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB. - * - * @param lease the lease to renew - * - * @return true if renewal succeeded, false otherwise - * - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity - * @throws DependencyException if DynamoDB update fails in an unexpected way - */ - public boolean renewLease(T lease) - throws DependencyException, InvalidStateException, ProvisionedThroughputException; - - /** - * Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on - * the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the - * passed-in lease object after updating DynamoDB. - * - * @param lease the lease to take - * @param owner the new owner - * - * @return true if lease was successfully taken, false otherwise - * - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity - * @throws DependencyException if DynamoDB update fails in an unexpected way - */ - public boolean takeLease(T lease, String owner) - throws DependencyException, InvalidStateException, ProvisionedThroughputException; - - /** - * Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of - * the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB. - * - * @param lease the lease to void - * - * @return true if eviction succeeded, false otherwise - * - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity - * @throws DependencyException if DynamoDB update fails in an unexpected way - */ - public boolean evictLease(T lease) - throws DependencyException, InvalidStateException, ProvisionedThroughputException; - - /** - * Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB. - * - * @param lease the lease to delete - * - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB delete fails due to lack of capacity - * @throws DependencyException if DynamoDB delete fails in an unexpected way - */ - public void deleteLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; - - /** - * Delete all leases from DynamoDB. Useful for tools/utils and testing. - * - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB scan or delete fail due to lack of capacity - * @throws DependencyException if DynamoDB scan or delete fail in an unexpected way - */ - public void deleteAll() throws DependencyException, InvalidStateException, ProvisionedThroughputException; - - /** - * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing - * library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the - * leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other - * updates. Mutates the lease counter of the passed-in lease object. - * - * @return true if update succeeded, false otherwise - * - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity - * @throws DependencyException if DynamoDB update fails in an unexpected way - */ - public boolean updateLease(T lease) - throws DependencyException, InvalidStateException, ProvisionedThroughputException; - - /** - * Check (synchronously) if there are any leases in the lease table. - * - * @return true if there are no leases in the lease table - * - * @throws DependencyException if DynamoDB scan fails in an unexpected way - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity - */ - public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException; - -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ILeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ILeaseSerializer.java deleted file mode 100644 index c01a03b1..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ILeaseSerializer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.leases; - -import java.util.Collection; -import java.util.Map; - -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; -import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue; -import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; -import software.amazon.kinesis.leases.Lease; - -/** - * Utility class that manages the mapping of Lease objects/operations to records in DynamoDB. - * - * @param Lease subclass, possibly Lease itself - */ -public interface ILeaseSerializer { - - /** - * Construct a DynamoDB record out of a Lease object - * - * @param lease lease object to serialize - * @return an attribute value map representing the lease object - */ - public Map toDynamoRecord(T lease); - - /** - * Construct a Lease object out of a DynamoDB record. - * - * @param dynamoRecord attribute value map from DynamoDB - * @return a deserialized lease object representing the attribute value map - */ - public T fromDynamoRecord(Map dynamoRecord); - - /** - * @param lease - * @return the attribute value map representing a Lease's hash key given a Lease object. - */ - public Map getDynamoHashKey(T lease); - - /** - * Special getDynamoHashKey implementation used by ILeaseManager.getLease(). - * - * @param leaseKey - * @return the attribute value map representing a Lease's hash key given a string. - */ - public Map getDynamoHashKey(String leaseKey); - - /** - * @param lease - * @return the attribute value map asserting that a lease counter is what we expect. - */ - public Map getDynamoLeaseCounterExpectation(T lease); - - /** - * @param lease - * @return the attribute value map asserting that the lease owner is what we expect. - */ - public Map getDynamoLeaseOwnerExpectation(T lease); - - /** - * @return the attribute value map asserting that a lease does not exist. - */ - public Map getDynamoNonexistantExpectation(); - - /** - * @param lease - * @return the attribute value map that increments a lease counter - */ - public Map getDynamoLeaseCounterUpdate(T lease); - - /** - * @param lease - * @param newOwner - * @return the attribute value map that takes a lease for a new owner - */ - public Map getDynamoTakeLeaseUpdate(T lease, String newOwner); - - /** - * @param lease - * @return the attribute value map that voids a lease - */ - public Map getDynamoEvictLeaseUpdate(T lease); - - /** - * @param lease - * @return the attribute value map that updates application-specific data for a lease and increments the lease - * counter - */ - public Map getDynamoUpdateLeaseUpdate(T lease); - - /** - * @return the key schema for creating a DynamoDB table to store leases - */ - public Collection getKeySchema(); - - /** - * @return attribute definitions for creating a DynamoDB table to store leases - */ - public Collection getAttributeDefinitions(); -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientDynamoDBLeaseManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientDynamoDBLeaseManager.java new file mode 100644 index 00000000..98c1a04d --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientDynamoDBLeaseManager.java @@ -0,0 +1,79 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.leases; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; + +/** + * An implementation of LeaseManager for the KinesisClientLibrary - takeLease updates the ownerSwitchesSinceCheckpoint field. + */ +public class KinesisClientDynamoDBLeaseManager extends DynamoDBLeaseManager implements KinesisClientLeaseManager { + /** + * Constructor. + * + * @param table Leases table + * @param dynamoDBClient DynamoDB client to use + */ + public KinesisClientDynamoDBLeaseManager(String table, AmazonDynamoDB dynamoDBClient) { + this(table, dynamoDBClient, false); + } + + /** + * Constructor for integration tests - see comment on superclass for documentation on setting the consistentReads + * flag. + * + * @param table leases table + * @param dynamoDBClient DynamoDB client to use + * @param consistentReads true if we want consistent reads for testing purposes. + */ + public KinesisClientDynamoDBLeaseManager(String table, AmazonDynamoDB dynamoDBClient, boolean consistentReads) { + super(table, dynamoDBClient, new KinesisClientLeaseSerializer(), consistentReads); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean takeLease(KinesisClientLease lease, String newOwner) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + String oldOwner = lease.getLeaseOwner(); + + boolean result = super.takeLease(lease, newOwner); + + if (oldOwner != null && !oldOwner.equals(newOwner)) { + lease.setOwnerSwitchesSinceCheckpoint(lease.getOwnerSwitchesSinceCheckpoint() + 1); + } + + return result; + } + + /** + * {@inheritDoc} + */ + @Override + public ExtendedSequenceNumber getCheckpoint(String shardId) + throws ProvisionedThroughputException, InvalidStateException, DependencyException { + ExtendedSequenceNumber checkpoint = null; + KinesisClientLease lease = getLease(shardId); + if (lease != null) { + checkpoint = lease.getCheckpoint(); + } + return checkpoint; + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLeaseManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLeaseManager.java index f304b525..9426ffd3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLeaseManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLeaseManager.java @@ -14,66 +14,27 @@ */ package software.amazon.kinesis.leases; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** - * An implementation of LeaseManager for the KinesisClientLibrary - takeLease updates the ownerSwitchesSinceCheckpoint field. + * A decoration of ILeaseManager that adds methods to get/update checkpoints. */ -public class KinesisClientLeaseManager extends LeaseManager implements IKinesisClientLeaseManager { +public interface KinesisClientLeaseManager extends LeaseManager { + /** - * Constructor. + * Gets the current checkpoint of the shard. This is useful in the resharding use case + * where we will wait for the parent shard to complete before starting on the records from a child shard. * - * @param table Leases table - * @param dynamoDBClient DynamoDB client to use - */ - public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient) { - this(table, dynamoDBClient, false); - } - - /** - * Constructor for integration tests - see comment on superclass for documentation on setting the consistentReads - * flag. + * @param shardId Checkpoint of this shard will be returned + * @return Checkpoint of this shard, or null if the shard record doesn't exist. * - * @param table leases table - * @param dynamoDBClient DynamoDB client to use - * @param consistentReads true if we want consistent reads for testing purposes. + * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity + * @throws InvalidStateException if lease table does not exist + * @throws DependencyException if DynamoDB update fails in an unexpected way */ - public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient, boolean consistentReads) { - super(table, dynamoDBClient, new KinesisClientLeaseSerializer(), consistentReads); - } + ExtendedSequenceNumber getCheckpoint(String shardId) throws ProvisionedThroughputException, InvalidStateException, DependencyException; - /** - * {@inheritDoc} - */ - @Override - public boolean takeLease(KinesisClientLease lease, String newOwner) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - String oldOwner = lease.getLeaseOwner(); - - boolean result = super.takeLease(lease, newOwner); - - if (oldOwner != null && !oldOwner.equals(newOwner)) { - lease.setOwnerSwitchesSinceCheckpoint(lease.getOwnerSwitchesSinceCheckpoint() + 1); - } - - return result; - } - - /** - * {@inheritDoc} - */ - @Override - public ExtendedSequenceNumber getCheckpoint(String shardId) - throws ProvisionedThroughputException, InvalidStateException, DependencyException { - ExtendedSequenceNumber checkpoint = null; - KinesisClientLease lease = getLease(shardId); - if (lease != null) { - checkpoint = lease.getCheckpoint(); - } - return checkpoint; - } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLeaseSerializer.java index 3fa620b4..87eaa86a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLeaseSerializer.java @@ -29,7 +29,7 @@ import com.google.common.base.Strings; /** * An implementation of ILeaseSerializer for KinesisClientLease objects. */ -public class KinesisClientLeaseSerializer implements ILeaseSerializer { +public class KinesisClientLeaseSerializer implements LeaseSerializer { private static final String OWNER_SWITCHES_KEY = "ownerSwitchesSinceCheckpoint"; private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint"; @@ -38,7 +38,7 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer toDynamoRecord(KinesisClientLease lease) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinator.java index 1a450a3c..031f01f0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinator.java @@ -20,11 +20,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import lombok.Data; -import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -42,12 +38,12 @@ public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager; + private final LeaseManager leaseManager; private long initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; private long initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; - public KinesisClientLibLeaseCoordinator(final ILeaseManager leaseManager, + public KinesisClientLibLeaseCoordinator(final LeaseManager leaseManager, final String workerIdentifier, final long leaseDurationMillis, final long epsilonMillis, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java index b5d8c261..07c3394c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java @@ -65,7 +65,7 @@ public class LeaseCoordinator { private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() .setNameFormat("LeaseRenewer-%04d").setDaemon(true).build(); - private final ILeaseRenewer leaseRenewer; + private final LeaseRenewer leaseRenewer; private final LeaseTaker leaseTaker; private final long renewerIntervalMillis; private final long takerIntervalMillis; @@ -87,7 +87,7 @@ public class LeaseCoordinator { * @param leaseDurationMillis Duration of a lease * @param epsilonMillis Allow for some variance when calculating lease expirations */ - public LeaseCoordinator(ILeaseManager leaseManager, + public LeaseCoordinator(LeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis, long epsilonMillis) { @@ -103,7 +103,7 @@ public class LeaseCoordinator { * @param epsilonMillis Allow for some variance when calculating lease expirations * @param metricsFactory Used to publish metrics about lease operations */ - public LeaseCoordinator(ILeaseManager leaseManager, + public LeaseCoordinator(LeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis, long epsilonMillis, @@ -124,7 +124,7 @@ public class LeaseCoordinator { * @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing) * @param metricsFactory Used to publish metrics about lease operations */ - public LeaseCoordinator(ILeaseManager leaseManager, + public LeaseCoordinator(LeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis, long epsilonMillis, @@ -136,7 +136,7 @@ public class LeaseCoordinator { this.leaseTaker = new DynamoDBLeaseTaker(leaseManager, workerIdentifier, leaseDurationMillis) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); - this.leaseRenewer = new LeaseRenewer( + this.leaseRenewer = new DynamoDBLeaseRenewer( leaseManager, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool); this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis; this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java index 81f8ee3b..f17b0065 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java @@ -23,7 +23,7 @@ public interface LeaseManagementFactory { ShardSyncTaskManager createShardSyncTaskManager(); - LeaseManager createLeaseManager(); + DynamoDBLeaseManager createLeaseManager(); KinesisClientLibLeaseCoordinator createKinesisClientLibLeaseCoordinator(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManager.java index 41772b1f..37415108 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -14,580 +14,181 @@ */ package software.amazon.kinesis.leases; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; -import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException; -import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; -import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest; -import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; -import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; -import com.amazonaws.services.dynamodbv2.model.GetItemRequest; -import com.amazonaws.services.dynamodbv2.model.GetItemResult; -import com.amazonaws.services.dynamodbv2.model.LimitExceededException; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.dynamodbv2.model.PutItemRequest; -import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; -import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; -import com.amazonaws.services.dynamodbv2.model.ScanRequest; -import com.amazonaws.services.dynamodbv2.model.ScanResult; -import com.amazonaws.services.dynamodbv2.model.TableStatus; -import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; - -import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.leases.Lease; /** - * An implementation of ILeaseManager that uses DynamoDB. + * Supports basic CRUD operations for Leases. + * + * @param Lease subclass, possibly Lease itself. */ -@Slf4j -public class LeaseManager implements ILeaseManager { - protected String table; - protected AmazonDynamoDB dynamoDBClient; - protected ILeaseSerializer serializer; - protected boolean consistentReads; +public interface LeaseManager { /** - * Constructor. + * Creates the table that will store leases. Succeeds if table already exists. * - * @param table leases table - * @param dynamoDBClient DynamoDB client to use - * @param serializer LeaseSerializer to use to convert to/from DynamoDB objects. - */ - public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer serializer) { - this(table, dynamoDBClient, serializer, false); - } - - /** - * Constructor for test cases - allows control of consistent reads. Consistent reads should only be used for testing - * - our code is meant to be resilient to inconsistent reads. Using consistent reads during testing speeds up - * execution of simple tests (you don't have to wait out the consistency window). Test cases that want to experience - * eventual consistency should not set consistentReads=true. + * @param readCapacity + * @param writeCapacity * - * @param table leases table - * @param dynamoDBClient DynamoDB client to use - * @param serializer lease serializer to use - * @param consistentReads true if we want consistent reads for testing purposes. + * @return true if we created a new table (table didn't exist before) + * + * @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity + * restrictions. + * @throws DependencyException if DynamoDB createTable fails in an unexpected way */ - public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer serializer, boolean consistentReads) { - verifyNotNull(table, "Table name cannot be null"); - verifyNotNull(dynamoDBClient, "dynamoDBClient cannot be null"); - verifyNotNull(serializer, "ILeaseSerializer cannot be null"); - - this.table = table; - this.dynamoDBClient = dynamoDBClient; - this.consistentReads = consistentReads; - this.serializer = serializer; - } - - /** - * {@inheritDoc} - */ - @Override public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) - throws ProvisionedThroughputException, DependencyException { - verifyNotNull(readCapacity, "readCapacity cannot be null"); - verifyNotNull(writeCapacity, "writeCapacity cannot be null"); - - try { - if (tableStatus() != null) { - return false; - } - } catch (DependencyException de) { - // - // Something went wrong with DynamoDB - // - log.error("Failed to get table status for {}", table, de); - } - CreateTableRequest request = new CreateTableRequest(); - request.setTableName(table); - request.setKeySchema(serializer.getKeySchema()); - request.setAttributeDefinitions(serializer.getAttributeDefinitions()); - - ProvisionedThroughput throughput = new ProvisionedThroughput(); - throughput.setReadCapacityUnits(readCapacity); - throughput.setWriteCapacityUnits(writeCapacity); - request.setProvisionedThroughput(throughput); - - try { - dynamoDBClient.createTable(request); - } catch (ResourceInUseException e) { - log.info("Table {} already exists.", table); - return false; - } catch (LimitExceededException e) { - throw new ProvisionedThroughputException("Capacity exceeded when creating table " + table, e); - } catch (AmazonClientException e) { - throw new DependencyException(e); - } - return true; - } + throws ProvisionedThroughputException, DependencyException; /** - * {@inheritDoc} - */ - @Override - public boolean leaseTableExists() throws DependencyException { - return TableStatus.ACTIVE == tableStatus(); - } - - private TableStatus tableStatus() throws DependencyException { - DescribeTableRequest request = new DescribeTableRequest(); - - request.setTableName(table); - - DescribeTableResult result; - try { - result = dynamoDBClient.describeTable(request); - } catch (ResourceNotFoundException e) { - if (log.isDebugEnabled()) { - log.debug("Got ResourceNotFoundException for table {} in leaseTableExists, returning false.", table); - } - return null; - } catch (AmazonClientException e) { - throw new DependencyException(e); - } - - TableStatus tableStatus = TableStatus.fromValue(result.getTable().getTableStatus()); - if (log.isDebugEnabled()) { - log.debug("Lease table exists and is in status {}", tableStatus); - } - - return tableStatus; - } - - @Override - public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException { - long sleepTimeRemaining = TimeUnit.SECONDS.toMillis(timeoutSeconds); - - while (!leaseTableExists()) { - if (sleepTimeRemaining <= 0) { - return false; - } - - long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining); - - sleepTimeRemaining -= sleep(timeToSleepMillis); - } - - return true; - } - - /** - * Exposed for testing purposes. + * @return true if the lease table already exists. * - * @param timeToSleepMillis time to sleep in milliseconds + * @throws DependencyException if DynamoDB describeTable fails in an unexpected way + */ + public boolean leaseTableExists() throws DependencyException; + + /** + * Blocks until the lease table exists by polling leaseTableExists. * - * @return actual time slept in millis - */ - long sleep(long timeToSleepMillis) { - long startTime = System.currentTimeMillis(); - - try { - Thread.sleep(timeToSleepMillis); - } catch (InterruptedException e) { - log.debug("Interrupted while sleeping"); - } - - return System.currentTimeMillis() - startTime; - } - - /** - * {@inheritDoc} - */ - @Override - public List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return list(null); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return list(1).isEmpty(); - } - - /** - * List with the given page size. Package access for integration testing. + * @param secondsBetweenPolls time to wait between polls in seconds + * @param timeoutSeconds total time to wait in seconds + * + * @return true if table exists, false if timeout was reached + * + * @throws DependencyException if DynamoDB describeTable fails in an unexpected way + */ + public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException; + + /** + * List all objects in table synchronously. + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity * - * @param limit number of items to consider at a time - used by integration tests to force paging. * @return list of leases - * @throws InvalidStateException if table does not exist - * @throws DependencyException if DynamoDB scan fail in an unexpected way - * @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity */ - List list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - if (log.isDebugEnabled()) { - log.debug("Listing leases from table {}", table); - } - - ScanRequest scanRequest = new ScanRequest(); - scanRequest.setTableName(table); - if (limit != null) { - scanRequest.setLimit(limit); - } - - try { - ScanResult scanResult = dynamoDBClient.scan(scanRequest); - List result = new ArrayList(); - - while (scanResult != null) { - for (Map item : scanResult.getItems()) { - if (log.isDebugEnabled()) { - log.debug("Got item {} from DynamoDB.", item.toString()); - } - - result.add(serializer.fromDynamoRecord(item)); - } - - Map lastEvaluatedKey = scanResult.getLastEvaluatedKey(); - if (lastEvaluatedKey == null) { - // Signify that we're done. - scanResult = null; - if (log.isDebugEnabled()) { - log.debug("lastEvaluatedKey was null - scan finished."); - } - } else { - // Make another request, picking up where we left off. - scanRequest.setExclusiveStartKey(lastEvaluatedKey); - - if (log.isDebugEnabled()) { - log.debug("lastEvaluatedKey was {}, continuing scan.", lastEvaluatedKey); - } - - scanResult = dynamoDBClient.scan(scanRequest); - } - } - - if (log.isDebugEnabled()) { - log.debug("Listed {} leases from table {}", result.size(), table); - } - - return result; - } catch (ResourceNotFoundException e) { - throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e); - } catch (ProvisionedThroughputExceededException e) { - throw new ProvisionedThroughputException(e); - } catch (AmazonClientException e) { - throw new DependencyException(e); - } - } + public List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** - * {@inheritDoc} + * Create a new lease. Conditional on a lease not already existing with this shardId. + * + * @param lease the lease to create + * + * @return true if lease was created, false if lease already exists + * + * @throws DependencyException if DynamoDB put fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB put fails due to lack of capacity */ - @Override public boolean createLeaseIfNotExists(T lease) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - verifyNotNull(lease, "lease cannot be null"); - - if (log.isDebugEnabled()) { - log.debug("Creating lease {}", lease); - } - - PutItemRequest request = new PutItemRequest(); - request.setTableName(table); - request.setItem(serializer.toDynamoRecord(lease)); - request.setExpected(serializer.getDynamoNonexistantExpectation()); - - try { - dynamoDBClient.putItem(request); - } catch (ConditionalCheckFailedException e) { - if (log.isDebugEnabled()) { - log.debug("Did not create lease {} because it already existed", lease); - } - - return false; - } catch (AmazonClientException e) { - throw convertAndRethrowExceptions("create", lease.getLeaseKey(), e); - } - - return true; - } + throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** - * {@inheritDoc} + * @param shardId Get the lease for this shardId + * + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity + * @throws DependencyException if DynamoDB get fails in an unexpected way + * + * @return lease for the specified shardId, or null if one doesn't exist */ - @Override - public T getLease(String leaseKey) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - verifyNotNull(leaseKey, "leaseKey cannot be null"); - - if (log.isDebugEnabled()) { - log.debug("Getting lease with key {}", leaseKey); - } - - GetItemRequest request = new GetItemRequest(); - request.setTableName(table); - request.setKey(serializer.getDynamoHashKey(leaseKey)); - request.setConsistentRead(consistentReads); - - try { - GetItemResult result = dynamoDBClient.getItem(request); - - Map dynamoRecord = result.getItem(); - if (dynamoRecord == null) { - if (log.isDebugEnabled()) { - log.debug("No lease found with key {}, returning null.", leaseKey); - } - - return null; - } else { - T lease = serializer.fromDynamoRecord(dynamoRecord); - if (log.isDebugEnabled()) { - log.debug("Got lease {}", lease); - } - - return lease; - } - } catch (AmazonClientException e) { - throw convertAndRethrowExceptions("get", leaseKey, e); - } - } + public T getLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** - * {@inheritDoc} + * Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter + * of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB. + * + * @param lease the lease to renew + * + * @return true if renewal succeeded, false otherwise + * + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity + * @throws DependencyException if DynamoDB update fails in an unexpected way */ - @Override public boolean renewLease(T lease) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - verifyNotNull(lease, "lease cannot be null"); - - if (log.isDebugEnabled()) { - log.debug("Renewing lease with key {}", lease.getLeaseKey()); - } - - UpdateItemRequest request = new UpdateItemRequest(); - request.setTableName(table); - request.setKey(serializer.getDynamoHashKey(lease)); - request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); - request.setAttributeUpdates(serializer.getDynamoLeaseCounterUpdate(lease)); - - try { - dynamoDBClient.updateItem(request); - } catch (ConditionalCheckFailedException e) { - if (log.isDebugEnabled()) { - log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}", - lease.getLeaseKey(), lease.getLeaseCounter()); - } - - // If we had a spurious retry during the Dynamo update, then this conditional PUT failure - // might be incorrect. So, we get the item straight away and check if the lease owner + lease counter - // are what we expected. - String expectedOwner = lease.getLeaseOwner(); - Long expectedCounter = lease.getLeaseCounter() + 1; - T updatedLease = getLease(lease.getLeaseKey()); - if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) || - !expectedCounter.equals(updatedLease.getLeaseCounter())) { - return false; - } - - log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.getLeaseKey()); - } catch (AmazonClientException e) { - throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e); - } - - lease.setLeaseCounter(lease.getLeaseCounter() + 1); - return true; - } + throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** - * {@inheritDoc} + * Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on + * the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the + * passed-in lease object after updating DynamoDB. + * + * @param lease the lease to take + * @param owner the new owner + * + * @return true if lease was successfully taken, false otherwise + * + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity + * @throws DependencyException if DynamoDB update fails in an unexpected way */ - @Override public boolean takeLease(T lease, String owner) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - verifyNotNull(lease, "lease cannot be null"); - verifyNotNull(owner, "owner cannot be null"); - - if (log.isDebugEnabled()) { - log.debug("Taking lease with leaseKey {} from {} to {}", - lease.getLeaseKey(), - lease.getLeaseOwner() == null ? "nobody" : lease.getLeaseOwner(), - owner); - } - - UpdateItemRequest request = new UpdateItemRequest(); - request.setTableName(table); - request.setKey(serializer.getDynamoHashKey(lease)); - request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); - - Map updates = serializer.getDynamoLeaseCounterUpdate(lease); - updates.putAll(serializer.getDynamoTakeLeaseUpdate(lease, owner)); - request.setAttributeUpdates(updates); - - try { - dynamoDBClient.updateItem(request); - } catch (ConditionalCheckFailedException e) { - if (log.isDebugEnabled()) { - log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}", - lease.getLeaseKey(), lease.getLeaseCounter()); - } - - return false; - } catch (AmazonClientException e) { - throw convertAndRethrowExceptions("take", lease.getLeaseKey(), e); - } - - lease.setLeaseCounter(lease.getLeaseCounter() + 1); - lease.setLeaseOwner(owner); - - return true; - } + throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** - * {@inheritDoc} + * Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of + * the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB. + * + * @param lease the lease to void + * + * @return true if eviction succeeded, false otherwise + * + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity + * @throws DependencyException if DynamoDB update fails in an unexpected way */ - @Override public boolean evictLease(T lease) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - verifyNotNull(lease, "lease cannot be null"); - - if (log.isDebugEnabled()) { - log.debug("Evicting lease with leaseKey {} owned by {}", - lease.getLeaseKey(), - lease.getLeaseOwner()); - } - - UpdateItemRequest request = new UpdateItemRequest(); - request.setTableName(table); - request.setKey(serializer.getDynamoHashKey(lease)); - request.setExpected(serializer.getDynamoLeaseOwnerExpectation(lease)); - - Map updates = serializer.getDynamoLeaseCounterUpdate(lease); - updates.putAll(serializer.getDynamoEvictLeaseUpdate(lease)); - request.setAttributeUpdates(updates); - - try { - dynamoDBClient.updateItem(request); - } catch (ConditionalCheckFailedException e) { - if (log.isDebugEnabled()) { - log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}", - lease.getLeaseKey(), lease.getLeaseOwner()); - } - - return false; - } catch (AmazonClientException e) { - throw convertAndRethrowExceptions("evict", lease.getLeaseKey(), e); - } - - lease.setLeaseOwner(null); - lease.setLeaseCounter(lease.getLeaseCounter() + 1); - return true; - } + throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** - * {@inheritDoc} + * Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB. + * + * @param lease the lease to delete + * + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB delete fails due to lack of capacity + * @throws DependencyException if DynamoDB delete fails in an unexpected way */ - public void deleteAll() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - List allLeases = listLeases(); - - log.warn("Deleting {} items from table {}", allLeases.size(), table); - - for (T lease : allLeases) { - DeleteItemRequest deleteRequest = new DeleteItemRequest(); - deleteRequest.setTableName(table); - deleteRequest.setKey(serializer.getDynamoHashKey(lease)); - - dynamoDBClient.deleteItem(deleteRequest); - } - } + public void deleteLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** - * {@inheritDoc} + * Delete all leases from DynamoDB. Useful for tools/utils and testing. + * + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan or delete fail due to lack of capacity + * @throws DependencyException if DynamoDB scan or delete fail in an unexpected way */ - @Override - public void deleteLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - verifyNotNull(lease, "lease cannot be null"); - - if (log.isDebugEnabled()) { - log.debug("Deleting lease with leaseKey {}", lease.getLeaseKey()); - } - - DeleteItemRequest deleteRequest = new DeleteItemRequest(); - deleteRequest.setTableName(table); - deleteRequest.setKey(serializer.getDynamoHashKey(lease)); - - try { - dynamoDBClient.deleteItem(deleteRequest); - } catch (AmazonClientException e) { - throw convertAndRethrowExceptions("delete", lease.getLeaseKey(), e); - } - } + public void deleteAll() throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** - * {@inheritDoc} + * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing + * library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the + * leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other + * updates. Mutates the lease counter of the passed-in lease object. + * + * @return true if update succeeded, false otherwise + * + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity + * @throws DependencyException if DynamoDB update fails in an unexpected way */ - @Override public boolean updateLease(T lease) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - verifyNotNull(lease, "lease cannot be null"); + throws DependencyException, InvalidStateException, ProvisionedThroughputException; - if (log.isDebugEnabled()) { - log.debug("Updating lease {}", lease); - } - - UpdateItemRequest request = new UpdateItemRequest(); - request.setTableName(table); - request.setKey(serializer.getDynamoHashKey(lease)); - request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); - - Map updates = serializer.getDynamoLeaseCounterUpdate(lease); - updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); - request.setAttributeUpdates(updates); - - try { - dynamoDBClient.updateItem(request); - } catch (ConditionalCheckFailedException e) { - if (log.isDebugEnabled()) { - log.debug("Lease update failed for lease with key {} because the lease counter was not {}", - lease.getLeaseKey(), lease.getLeaseCounter()); - } - - return false; - } catch (AmazonClientException e) { - throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e); - } - - lease.setLeaseCounter(lease.getLeaseCounter() + 1); - return true; - } - - /* - * This method contains boilerplate exception handling - it throws or returns something to be thrown. The - * inconsistency there exists to satisfy the compiler when this method is used at the end of non-void methods. + /** + * Check (synchronously) if there are any leases in the lease table. + * + * @return true if there are no leases in the lease table + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity */ - protected DependencyException convertAndRethrowExceptions(String operation, String leaseKey, AmazonClientException e) - throws ProvisionedThroughputException, InvalidStateException { - if (e instanceof ProvisionedThroughputExceededException) { - log.warn("Provisioned Throughput on the lease table has been exceeded. It's recommended that you increase the IOPs on the table. Failure to increase the IOPs may cause the application to not make progress."); - throw new ProvisionedThroughputException(e); - } else if (e instanceof ResourceNotFoundException) { - // @formatter:on - throw new InvalidStateException(String.format("Cannot %s lease with key %s because table %s does not exist.", - operation, - leaseKey, - table), - e); - //@formatter:off - } else { - return new DependencyException(e); - } - } - - private void verifyNotNull(Object object, String message) { - if (object == null) { - throw new IllegalArgumentException(message); - } - } + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java index 8f681531..aaf5b6d0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java @@ -28,7 +28,7 @@ import software.amazon.kinesis.leases.Lease; * LeaseCoordinator instance corresponds to one worker, and uses exactly one ILeaseRenewer to manage lease renewal for * that worker. */ -public interface ILeaseRenewer { +public interface LeaseRenewer { /** * Bootstrap initial set of leases from the LeaseManager (e.g. upon process restart, pick up leases we own) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java index 029aae1e..15e75952 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java @@ -14,181 +14,103 @@ */ package software.amazon.kinesis.leases; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; -import com.amazonaws.services.dynamodbv2.model.KeyType; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; +import software.amazon.kinesis.leases.Lease; /** - * An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that - * LeaseSerializer can be decorated by other classes if you need to add fields to leases. + * Utility class that manages the mapping of Lease objects/operations to records in DynamoDB. + * + * @param Lease subclass, possibly Lease itself */ -public class LeaseSerializer implements ILeaseSerializer { +public interface LeaseSerializer { - public final String LEASE_KEY_KEY = "leaseKey"; - public final String LEASE_OWNER_KEY = "leaseOwner"; - public final String LEASE_COUNTER_KEY = "leaseCounter"; - public final Class clazz; + /** + * Construct a DynamoDB record out of a Lease object + * + * @param lease lease object to serialize + * @return an attribute value map representing the lease object + */ + public Map toDynamoRecord(T lease); - public LeaseSerializer() { - this.clazz = Lease.class; - } + /** + * Construct a Lease object out of a DynamoDB record. + * + * @param dynamoRecord attribute value map from DynamoDB + * @return a deserialized lease object representing the attribute value map + */ + public T fromDynamoRecord(Map dynamoRecord); - public LeaseSerializer(Class clazz) { - this.clazz = clazz; - } + /** + * @param lease + * @return the attribute value map representing a Lease's hash key given a Lease object. + */ + public Map getDynamoHashKey(T lease); - @Override - public Map toDynamoRecord(Lease lease) { - Map result = new HashMap(); + /** + * Special getDynamoHashKey implementation used by ILeaseManager.getLease(). + * + * @param leaseKey + * @return the attribute value map representing a Lease's hash key given a string. + */ + public Map getDynamoHashKey(String leaseKey); - result.put(LEASE_KEY_KEY, DynamoUtils.createAttributeValue(lease.getLeaseKey())); - result.put(LEASE_COUNTER_KEY, DynamoUtils.createAttributeValue(lease.getLeaseCounter())); + /** + * @param lease + * @return the attribute value map asserting that a lease counter is what we expect. + */ + public Map getDynamoLeaseCounterExpectation(T lease); - if (lease.getLeaseOwner() != null) { - result.put(LEASE_OWNER_KEY, DynamoUtils.createAttributeValue(lease.getLeaseOwner())); - } + /** + * @param lease + * @return the attribute value map asserting that the lease owner is what we expect. + */ + public Map getDynamoLeaseOwnerExpectation(T lease); - return result; - } + /** + * @return the attribute value map asserting that a lease does not exist. + */ + public Map getDynamoNonexistantExpectation(); - @Override - public Lease fromDynamoRecord(Map dynamoRecord) { - Lease result; - try { - result = clazz.newInstance(); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } + /** + * @param lease + * @return the attribute value map that increments a lease counter + */ + public Map getDynamoLeaseCounterUpdate(T lease); - result.setLeaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY)); - result.setLeaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY)); - result.setLeaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY)); + /** + * @param lease + * @param newOwner + * @return the attribute value map that takes a lease for a new owner + */ + public Map getDynamoTakeLeaseUpdate(T lease, String newOwner); - return result; - } + /** + * @param lease + * @return the attribute value map that voids a lease + */ + public Map getDynamoEvictLeaseUpdate(T lease); - @Override - public Map getDynamoHashKey(String leaseKey) { - Map result = new HashMap(); + /** + * @param lease + * @return the attribute value map that updates application-specific data for a lease and increments the lease + * counter + */ + public Map getDynamoUpdateLeaseUpdate(T lease); - result.put(LEASE_KEY_KEY, DynamoUtils.createAttributeValue(leaseKey)); + /** + * @return the key schema for creating a DynamoDB table to store leases + */ + public Collection getKeySchema(); - return result; - } - - @Override - public Map getDynamoHashKey(Lease lease) { - return getDynamoHashKey(lease.getLeaseKey()); - } - - @Override - public Map getDynamoLeaseCounterExpectation(Lease lease) { - return getDynamoLeaseCounterExpectation(lease.getLeaseCounter()); - } - - public Map getDynamoLeaseCounterExpectation(Long leaseCounter) { - Map result = new HashMap(); - - ExpectedAttributeValue eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(leaseCounter)); - result.put(LEASE_COUNTER_KEY, eav); - - return result; - } - - @Override - public Map getDynamoLeaseOwnerExpectation(Lease lease) { - Map result = new HashMap(); - - ExpectedAttributeValue eav = null; - - if (lease.getLeaseOwner() == null) { - eav = new ExpectedAttributeValue(false); - } else { - eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner())); - } - - result.put(LEASE_OWNER_KEY, eav); - - return result; - } - - @Override - public Map getDynamoNonexistantExpectation() { - Map result = new HashMap(); - - ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(false); - result.put(LEASE_KEY_KEY, expectedAV); - - return result; - } - - @Override - public Map getDynamoLeaseCounterUpdate(Lease lease) { - return getDynamoLeaseCounterUpdate(lease.getLeaseCounter()); - } - - public Map getDynamoLeaseCounterUpdate(Long leaseCounter) { - Map result = new HashMap(); - - AttributeValueUpdate avu = - new AttributeValueUpdate(DynamoUtils.createAttributeValue(leaseCounter + 1), AttributeAction.PUT); - result.put(LEASE_COUNTER_KEY, avu); - - return result; - } - - @Override - public Map getDynamoTakeLeaseUpdate(Lease lease, String owner) { - Map result = new HashMap(); - - result.put(LEASE_OWNER_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(owner), - AttributeAction.PUT)); - - return result; - } - - @Override - public Map getDynamoEvictLeaseUpdate(Lease lease) { - Map result = new HashMap(); - - result.put(LEASE_OWNER_KEY, new AttributeValueUpdate(null, AttributeAction.DELETE)); - - return result; - } - - @Override - public Map getDynamoUpdateLeaseUpdate(Lease lease) { - // There is no application-specific data in Lease - just return a map that increments the counter. - return new HashMap(); - } - - @Override - public Collection getKeySchema() { - List keySchema = new ArrayList(); - keySchema.add(new KeySchemaElement().withAttributeName(LEASE_KEY_KEY).withKeyType(KeyType.HASH)); - - return keySchema; - } - - @Override - public Collection getAttributeDefinitions() { - List definitions = new ArrayList(); - definitions.add(new AttributeDefinition().withAttributeName(LEASE_KEY_KEY) - .withAttributeType(ScalarAttributeType.S)); - - return definitions; - } + /** + * @return attribute definitions for creating a DynamoDB table to store leases + */ + public Collection getAttributeDefinitions(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index c526d5c5..7ed43518 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -36,7 +36,7 @@ public class ShardSyncTask implements ITask { @NonNull private final LeaseManagerProxy leaseManagerProxy; @NonNull - private final ILeaseManager leaseManager; + private final LeaseManager leaseManager; @NonNull private final InitialPositionInStreamExtended initialPosition; private final boolean cleanupLeasesUponShardCompletion; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index b9bf70a0..591dda20 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -41,7 +41,7 @@ public class ShardSyncTaskManager { @NonNull private final LeaseManagerProxy leaseManagerProxy; @NonNull - private final ILeaseManager leaseManager; + private final LeaseManager leaseManager; @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesUponShardCompletion; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java index 3423086f..233f7d34 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java @@ -58,7 +58,7 @@ public class ShardSyncer { } static synchronized void bootstrapShardLeases(@NonNull final LeaseManagerProxy leaseManagerProxy, - @NonNull final ILeaseManager leaseManager, + @NonNull final LeaseManager leaseManager, @NonNull final InitialPositionInStreamExtended initialPositionInStream, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards) @@ -80,7 +80,7 @@ public class ShardSyncer { * @throws KinesisClientLibIOException */ public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final LeaseManagerProxy leaseManagerProxy, - @NonNull final ILeaseManager leaseManager, + @NonNull final LeaseManager leaseManager, @NonNull final InitialPositionInStreamExtended initialPositionInStream, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards) @@ -90,7 +90,7 @@ public class ShardSyncer { } static synchronized void checkAndCreateLeasesForNewShards(@NonNull final LeaseManagerProxy leaseManagerProxy, - @NonNull final ILeaseManager leaseManager, + @NonNull final LeaseManager leaseManager, @NonNull final InitialPositionInStreamExtended initialPositionInStream, final boolean cleanupLeasesOfCompletedShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { @@ -112,7 +112,7 @@ public class ShardSyncer { */ // CHECKSTYLE:OFF CyclomaticComplexity private static synchronized void syncShardLeases(@NonNull final LeaseManagerProxy leaseManagerProxy, - final ILeaseManager leaseManager, + final LeaseManager leaseManager, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards) @@ -593,7 +593,7 @@ public class ShardSyncer { private static void cleanupGarbageLeases(@NonNull final LeaseManagerProxy leaseManagerProxy, final List shards, final List trackedLeases, - final ILeaseManager leaseManager) + final LeaseManager leaseManager) throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { Set kinesisShards = new HashSet<>(); for (Shard shard : shards) { @@ -684,7 +684,7 @@ public class ShardSyncer { Map shardIdToShardMap, Map> shardIdToChildShardIdsMap, List trackedLeases, - ILeaseManager leaseManager) + LeaseManager leaseManager) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { Set shardIdsOfClosedShards = new HashSet<>(); List leasesOfClosedShards = new ArrayList<>(); @@ -731,7 +731,7 @@ public class ShardSyncer { static synchronized void cleanupLeaseForClosedShard(String closedShardId, Set childShardIds, Map trackedLeases, - ILeaseManager leaseManager) + LeaseManager leaseManager) throws DependencyException, InvalidStateException, ProvisionedThroughputException { KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId); List childShardLeases = new ArrayList<>(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index 2b3b3035..81ef5588 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -20,7 +20,7 @@ import lombok.AccessLevel; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -39,7 +39,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class BlockOnParentShardTask implements ITask { @NonNull private final ShardInfo shardInfo; - private final ILeaseManager leaseManager; + private final LeaseManager leaseManager; // Sleep for this duration if the parent shards have not completed processing, or we encounter an exception. private final long parentShardPollIntervalMillis; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 3dbcf5a3..1c2af227 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -35,7 +35,7 @@ import lombok.Synchronized; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.ShardInfo; @@ -60,7 +60,7 @@ public class ShardConsumer { @NonNull private final String streamName; @NonNull - private final ILeaseManager leaseManager; + private final LeaseManager leaseManager; @NonNull private final ExecutorService executorService; @NonNull diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index dc19ddf1..2d6b9430 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -21,7 +21,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.ShardInfo; @@ -55,7 +55,7 @@ public class ShutdownTask implements ITask { private final boolean cleanupLeasesOfCompletedShards; private final boolean ignoreUnexpectedChildShards; @NonNull - private final ILeaseManager leaseManager; + private final LeaseManager leaseManager; private final long backoffTimeMillis; @NonNull private final GetRecordsCache getRecordsCache; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousPrefetchingRetrievalFactory.java index 4ed90d17..a6e18810 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousPrefetchingRetrievalFactory.java @@ -52,7 +52,7 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory } @Override - public GetRecordsCache createGetRecordsCache(final ShardInfo shardInfo) { + public GetRecordsCache createGetRecordsCache(final ShardInfo shardInfo, final IMetricsFactory metricsFactory) { return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecordsPerCall, createGetRecordsRetrievalStrategy(shardInfo), executorService, idleMillisBetweenCalls, metricsFactory, "Prefetching", shardInfo.shardId()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 83b1b48b..8bfd3ede 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -50,13 +50,13 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibN import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementFactory; -import software.amazon.kinesis.leases.LeaseManager; +import software.amazon.kinesis.leases.DynamoDBLeaseManager; import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardSyncTaskManager; @@ -111,7 +111,7 @@ public class SchedulerTest { @Mock private ShardSyncTaskManager shardSyncTaskManager; @Mock - private LeaseManager leaseManager; + private DynamoDBLeaseManager dynamoDBLeaseManager; @Mock private LeaseManagerProxy leaseManagerProxy; @Mock @@ -131,7 +131,7 @@ public class SchedulerTest { processorConfig = new ProcessorConfig(processorFactory); retrievalConfig = new RetrievalConfig(streamName, amazonKinesis).retrievalFactory(retrievalFactory); - when(leaseCoordinator.leaseManager()).thenReturn(leaseManager); + when(leaseCoordinator.leaseManager()).thenReturn(dynamoDBLeaseManager); when(shardSyncTaskManager.leaseManagerProxy()).thenReturn(leaseManagerProxy); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(IMetricsFactory.class))).thenReturn(getRecordsCache); @@ -442,8 +442,8 @@ public class SchedulerTest { } @Override - public LeaseManager createLeaseManager() { - return leaseManager; + public DynamoDBLeaseManager createLeaseManager() { + return dynamoDBLeaseManager; } @Override @@ -460,7 +460,7 @@ public class SchedulerTest { private class TestKinesisCheckpointFactory implements CheckpointFactory { @Override public Checkpointer createCheckpointer(final LeaseCoordinator leaseCoordinator, - final ILeaseManager leaseManager) { + final LeaseManager leaseManager) { return checkpoint; } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseManagerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseManagerIntegrationTest.java similarity index 96% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseManagerIntegrationTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseManagerIntegrationTest.java index bf312de1..86473392 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseManagerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseManagerIntegrationTest.java @@ -24,7 +24,7 @@ import org.junit.Test; import software.amazon.kinesis.leases.exceptions.LeasingException; -public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { +public class DynamoDBLeaseManagerIntegrationTest extends LeaseIntegrationTest { /** * Test listLeases when no records are present. @@ -233,7 +233,7 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { @Test public void testWaitUntilLeaseTableExists() throws LeasingException { - KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true) { + KinesisClientDynamoDBLeaseManager manager = new KinesisClientDynamoDBLeaseManager("nagl_ShardProgress", ddbClient, true) { @Override long sleep(long timeToSleepMillis) { @@ -252,7 +252,7 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { * Just using AtomicInteger for the indirection it provides. */ final AtomicInteger sleepCounter = new AtomicInteger(0); - KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nonexistentTable", ddbClient, true) { + KinesisClientDynamoDBLeaseManager manager = new KinesisClientDynamoDBLeaseManager("nonexistentTable", ddbClient, true) { @Override long sleep(long timeToSleepMillis) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewerIntegrationTest.java index e37b95de..d5c76e0d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewerIntegrationTest.java @@ -24,16 +24,16 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.Executors; -public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { +public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest { // This test case's leases last 2 seconds private static final long LEASE_DURATION_MILLIS = 2000L; - private ILeaseRenewer renewer; + private LeaseRenewer renewer; @Before public void setUp() { - renewer = new LeaseRenewer( + renewer = new DynamoDBLeaseRenewer( leaseManager, "foo", LEASE_DURATION_MILLIS, Executors.newCachedThreadPool()); } @@ -242,7 +242,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager); builder.withLease(shardId, owner); Map leases = builder.build(); - LeaseRenewer renewer =new LeaseRenewer( + DynamoDBLeaseRenewer renewer =new DynamoDBLeaseRenewer( leaseManager, owner, 30000L, Executors.newCachedThreadPool()); renewer.initialize(); Map heldLeases = renewer.getCurrentlyHeldLeases(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewerTest.java index 0c619e24..cb75ab4b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/DynamoDBLeaseRenewerTest.java @@ -30,17 +30,14 @@ import org.mockito.Mockito; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; -import software.amazon.kinesis.leases.ILeaseManager; -import software.amazon.kinesis.leases.Lease; -import software.amazon.kinesis.leases.LeaseRenewer; -public class LeaseRenewerTest { +public class DynamoDBLeaseRenewerTest { - ILeaseManager leaseManager; + LeaseManager leaseManager; String workerIdentifier; long leaseDurationMillis; ExecutorService leaseRenewalExecService; - LeaseRenewer renewer; + DynamoDBLeaseRenewer renewer; List leasesToRenew; private static Lease newLease(String leaseKey, @@ -64,12 +61,12 @@ public class LeaseRenewerTest { @SuppressWarnings("unchecked") @Before public void before() { - leaseManager = Mockito.mock(ILeaseManager.class); + leaseManager = Mockito.mock(LeaseManager.class); workerIdentifier = "workerId"; leaseDurationMillis = 10000; leaseRenewalExecService = Executors.newSingleThreadExecutor(); leasesToRenew = null; - renewer = new LeaseRenewer<>(leaseManager, + renewer = new DynamoDBLeaseRenewer<>(leaseManager, workerIdentifier, leaseDurationMillis, Executors.newCachedThreadPool()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseManager.java index 891bebd7..b7c90bfa 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseManager.java @@ -20,8 +20,6 @@ import java.util.List; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.ILeaseManager; import lombok.extern.slf4j.Slf4j; @@ -30,7 +28,7 @@ import lombok.extern.slf4j.Slf4j; * */ @Slf4j -public class ExceptionThrowingLeaseManager implements ILeaseManager { +public class ExceptionThrowingLeaseManager implements LeaseManager { private static final Throwable EXCEPTION_MSG = new Throwable("Test Exception"); // Use array below to control in what situations we want to throw exceptions. @@ -70,14 +68,14 @@ public class ExceptionThrowingLeaseManager implements ILeaseManager leaseManager; + private final LeaseManager leaseManager; /** * Constructor accepts lease manager as only argument. * * @param leaseManager which will do the real implementations */ - ExceptionThrowingLeaseManager(ILeaseManager leaseManager) { + ExceptionThrowingLeaseManager(LeaseManager leaseManager) { this.leaseManager = leaseManager; this.leaseManagerMethodCallingCount = new int[ExceptionThrowingLeaseManagerMethods.values().length]; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorIntegrationTest.java index b54d6d30..6ddd1d93 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorIntegrationTest.java @@ -53,7 +53,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest { private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; - private static KinesisClientLeaseManager leaseManager; + private static KinesisClientDynamoDBLeaseManager leaseManager; private static DynamoDBCheckpointer dynamoDBCheckpointer; private KinesisClientLibLeaseCoordinator coordinator; @@ -66,7 +66,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest { if (leaseManager == null) { AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); leaseManager = - new KinesisClientLeaseManager(TABLE_NAME, ddb, useConsistentReads); + new KinesisClientDynamoDBLeaseManager(TABLE_NAME, ddb, useConsistentReads); } leaseManager.createLeaseTableIfNotExists(10L, 10L); leaseManager.deleteAll(); @@ -220,7 +220,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest { assertEquals(original, actual); // Assert the contents of the lease } - public void addLeasesToRenew(ILeaseRenewer renewer, String... shardIds) + public void addLeasesToRenew(LeaseRenewer renewer, String... shardIds) throws DependencyException, InvalidStateException { List leasesToRenew = new ArrayList(); @@ -233,7 +233,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest { renewer.addLeasesToRenew(leasesToRenew); } - public Map renewMutateAssert(ILeaseRenewer renewer, + public Map renewMutateAssert(LeaseRenewer renewer, String... renewedShardIds) throws DependencyException, InvalidStateException { renewer.renewLeases(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorTest.java index f7d81f5f..733c13e7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorTest.java @@ -44,7 +44,7 @@ public class KinesisClientLibLeaseCoordinatorTest { private static final UUID TEST_UUID = UUID.randomUUID(); @Mock - private ILeaseManager leaseManager; + private LeaseManager leaseManager; @Mock private LeaseCoordinator leaseCoordinator; @Mock diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java index 5843d2ab..0a32a5d2 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java @@ -55,7 +55,7 @@ public class LeaseCoordinatorExerciser { new DefaultAWSCredentialsProviderChain(); AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(creds); - ILeaseManager leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddb); + LeaseManager leaseManager = new KinesisClientDynamoDBLeaseManager("nagl_ShardProgress", ddb); if (leaseManager.createLeaseTableIfNotExists(10L, 50L)) { log.info("Waiting for newly created lease table"); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java index 933a8326..d71061df 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java @@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class LeaseIntegrationTest { - protected static KinesisClientLeaseManager leaseManager; + protected static KinesisClientDynamoDBLeaseManager leaseManager; protected static AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); @@ -42,7 +42,7 @@ public class LeaseIntegrationTest { if (leaseManager == null) { // Do some static setup once per class. - leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true); + leaseManager = new KinesisClientDynamoDBLeaseManager("nagl_ShardProgress", ddbClient, true); MetricsHelper.startScope(new NullMetricsFactory()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java index 17990d57..973255ec 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java @@ -48,7 +48,7 @@ public class ShardSyncTaskIntegrationTest { private static final String STREAM_NAME = "IntegrationTestStream02"; private static AmazonKinesis amazonKinesis; - private IKinesisClientLeaseManager leaseManager; + private KinesisClientLeaseManager leaseManager; private LeaseManagerProxy leaseManagerProxy; /** @@ -86,7 +86,7 @@ public class ShardSyncTaskIntegrationTest { public void setUp() throws Exception { boolean useConsistentReads = true; leaseManager = - new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest", + new KinesisClientDynamoDBLeaseManager("ShardSyncTaskIntegrationTest", AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_EAST_1).build(), useConsistentReads); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java index aa00cbfe..106a4634 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java @@ -72,7 +72,7 @@ public class ShardSyncerTest { InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L)); private final boolean cleanupLeasesOfCompletedShards = true; private AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); - private LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient); + private DynamoDBLeaseManager dynamoDBLeaseManager = new KinesisClientDynamoDBLeaseManager("tempTestTable", ddbClient); private static final int EXPONENT = 128; /** * Old/Obsolete max value of a sequence number (2^128 -1). @@ -84,16 +84,16 @@ public class ShardSyncerTest { @Before public void setUp() throws Exception { - boolean created = leaseManager.createLeaseTableIfNotExists(1L, 1L); + boolean created = dynamoDBLeaseManager.createLeaseTableIfNotExists(1L, 1L); if (created) { log.info("New table created."); } - leaseManager.deleteAll(); + dynamoDBLeaseManager.deleteAll(); } @After public void tearDown() throws Exception { - leaseManager.deleteAll(); + dynamoDBLeaseManager.deleteAll(); } /** @@ -213,9 +213,9 @@ public class ShardSyncerTest { when(leaseManagerProxy.listShards()).thenReturn(shards); - ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_LATEST, + ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_LATEST, cleanupLeasesOfCompletedShards); - List newLeases = leaseManager.listLeases(); + List newLeases = dynamoDBLeaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet<>(); expectedLeaseShardIds.add("shardId-4"); expectedLeaseShardIds.add("shardId-8"); @@ -245,9 +245,9 @@ public class ShardSyncerTest { when(leaseManagerProxy.listShards()).thenReturn(shards); - ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON, + ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards); - List newLeases = leaseManager.listLeases(); + List newLeases = dynamoDBLeaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet<>(); for (int i = 0; i < 11; i++) { expectedLeaseShardIds.add("shardId-" + i); @@ -274,9 +274,9 @@ public class ShardSyncerTest { when(leaseManagerProxy.listShards()).thenReturn(shards); - ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP, + ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_AT_TIMESTAMP, cleanupLeasesOfCompletedShards); - List newLeases = leaseManager.listLeases(); + List newLeases = dynamoDBLeaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet<>(); for (int i = 0; i < 11; i++) { expectedLeaseShardIds.add("shardId-" + i); @@ -306,7 +306,7 @@ public class ShardSyncerTest { when(leaseManagerProxy.listShards()).thenReturn(shards); - ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON, + ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards); } @@ -330,9 +330,9 @@ public class ShardSyncerTest { when(leaseManagerProxy.listShards()).thenReturn(shards); - ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, leaseManager, INITIAL_POSITION_LATEST, + ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, dynamoDBLeaseManager, INITIAL_POSITION_LATEST, cleanupLeasesOfCompletedShards, true); - List newLeases = leaseManager.listLeases(); + List newLeases = dynamoDBLeaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet<>(); expectedLeaseShardIds.add("shardId-4"); expectedLeaseShardIds.add("shardId-5"); @@ -379,7 +379,7 @@ public class ShardSyncerTest { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON); // Need to clean up lease manager every time after calling ShardSyncer - leaseManager.deleteAll(); + dynamoDBLeaseManager.deleteAll(); } } @@ -403,7 +403,7 @@ public class ShardSyncerTest { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON); // Need to clean up lease manager every time after calling ShardSyncer - leaseManager.deleteAll(); + dynamoDBLeaseManager.deleteAll(); } } @@ -427,7 +427,7 @@ public class ShardSyncerTest { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c,INITIAL_POSITION_TRIM_HORIZON); // Need to clean up lease manager every time after calling ShardSyncer - leaseManager.deleteAll(); + dynamoDBLeaseManager.deleteAll(); } } @@ -440,7 +440,7 @@ public class ShardSyncerTest { throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { if (exceptionMethod != null) { ExceptionThrowingLeaseManager exceptionThrowingLeaseManager = - new ExceptionThrowingLeaseManager(leaseManager); + new ExceptionThrowingLeaseManager(dynamoDBLeaseManager); // Set exception and throwing time for exceptionThrowingManager. exceptionThrowingLeaseManager.setLeaseLeaseManagerThrowingExceptionScenario(exceptionMethod, exceptionTime); // Only need to try two times. @@ -459,7 +459,7 @@ public class ShardSyncerTest { } } else { ShardSyncer.checkAndCreateLeasesForNewShards(leaseManagerProxy, - leaseManager, + dynamoDBLeaseManager, position, cleanupLeasesOfCompletedShards); } @@ -501,7 +501,7 @@ public class ShardSyncerTest { ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_AT_TIMESTAMP); // Need to clean up lease manager every time after calling ShardSyncer - leaseManager.deleteAll(); + dynamoDBLeaseManager.deleteAll(); } } @@ -524,7 +524,7 @@ public class ShardSyncerTest { ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_AT_TIMESTAMP); // Need to clean up lease manager every time after calling ShardSyncer - leaseManager.deleteAll(); + dynamoDBLeaseManager.deleteAll(); } } @@ -547,7 +547,7 @@ public class ShardSyncerTest { ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c, INITIAL_POSITION_AT_TIMESTAMP); // Need to clean up lease manager every time after calling ShardSyncer - leaseManager.deleteAll(); + dynamoDBLeaseManager.deleteAll(); } } @@ -566,7 +566,7 @@ public class ShardSyncerTest { retryCheckAndCreateLeaseForNewShards(exceptionMethod, exceptionTime, position); - List newLeases = leaseManager.listLeases(); + List newLeases = dynamoDBLeaseManager.listLeases(); Map expectedShardIdToCheckpointMap = new HashMap<>(); for (int i = 0; i < 11; i++) { expectedShardIdToCheckpointMap.put("shardId-" + i, extendedSequenceNumber); @@ -578,18 +578,18 @@ public class ShardSyncerTest { assertEquals(expectedCheckpoint, lease1.getCheckpoint()); } - KinesisClientLease closedShardLease = leaseManager.getLease("shardId-0"); + KinesisClientLease closedShardLease = dynamoDBLeaseManager.getLease("shardId-0"); closedShardLease.setCheckpoint(ExtendedSequenceNumber.SHARD_END); - leaseManager.updateLease(closedShardLease); + dynamoDBLeaseManager.updateLease(closedShardLease); expectedShardIdToCheckpointMap.remove(closedShardLease.getLeaseKey()); - KinesisClientLease childShardLease = leaseManager.getLease("shardId-6"); + KinesisClientLease childShardLease = dynamoDBLeaseManager.getLease("shardId-6"); childShardLease.setCheckpoint(new ExtendedSequenceNumber("34290")); - leaseManager.updateLease(childShardLease); + dynamoDBLeaseManager.updateLease(childShardLease); expectedShardIdToCheckpointMap.put(childShardLease.getLeaseKey(), new ExtendedSequenceNumber("34290")); retryCheckAndCreateLeaseForNewShards(exceptionMethod, exceptionTime, position); - newLeases = leaseManager.listLeases(); + newLeases = dynamoDBLeaseManager.listLeases(); assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size()); for (KinesisClientLease lease1 : newLeases) { ExtendedSequenceNumber expectedCheckpoint = expectedShardIdToCheckpointMap.get(lease1.getLeaseKey()); @@ -617,10 +617,10 @@ public class ShardSyncerTest { null, ShardObjectHelper.newSequenceNumberRange("101", null))); garbageLease.setCheckpoint(new ExtendedSequenceNumber("999")); - leaseManager.createLeaseIfNotExists(garbageLease); - assertEquals(garbageShardId, leaseManager.getLease(garbageShardId).getLeaseKey()); + dynamoDBLeaseManager.createLeaseIfNotExists(garbageLease); + assertEquals(garbageShardId, dynamoDBLeaseManager.getLease(garbageShardId).getLeaseKey()); testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST); - assertNull(leaseManager.getLease(garbageShardId)); + assertNull(dynamoDBLeaseManager.getLease(garbageShardId)); } private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition) @@ -636,9 +636,9 @@ public class ShardSyncerTest { when(leaseManagerProxy.listShards()).thenReturn(shards); - ShardSyncer.bootstrapShardLeases(leaseManagerProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, + ShardSyncer.bootstrapShardLeases(leaseManagerProxy, dynamoDBLeaseManager, initialPosition, cleanupLeasesOfCompletedShards, false); - List newLeases = leaseManager.listLeases(); + List newLeases = dynamoDBLeaseManager.listLeases(); assertEquals(2, newLeases.size()); Set expectedLeaseShardIds = new HashSet(); expectedLeaseShardIds.add(shardId0); @@ -1446,7 +1446,7 @@ public class ShardSyncerTest { String closedShardId = "shardId-2"; KinesisClientLease leaseForClosedShard = newLease(closedShardId); leaseForClosedShard.setCheckpoint(new ExtendedSequenceNumber("1234")); - leaseManager.createLeaseIfNotExists(leaseForClosedShard); + dynamoDBLeaseManager.createLeaseIfNotExists(leaseForClosedShard); Set childShardIds = new HashSet<>(); List trackedLeases = new ArrayList<>(); @@ -1463,49 +1463,49 @@ public class ShardSyncerTest { Map trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); // empty list of leases - ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - assertNotNull(leaseManager.getLease(closedShardId)); + ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager); + assertNotNull(dynamoDBLeaseManager.getLease(closedShardId)); // closed shard has not been fully processed yet (checkpoint != SHARD_END) trackedLeases.add(leaseForClosedShard); trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); - ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - assertNotNull(leaseManager.getLease(closedShardId)); + ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager); + assertNotNull(dynamoDBLeaseManager.getLease(closedShardId)); // closed shard has been fully processed yet (checkpoint == SHARD_END) leaseForClosedShard.setCheckpoint(ExtendedSequenceNumber.SHARD_END); - leaseManager.updateLease(leaseForClosedShard); - ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - assertNull(leaseManager.getLease(closedShardId)); + dynamoDBLeaseManager.updateLease(leaseForClosedShard); + ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager); + assertNull(dynamoDBLeaseManager.getLease(closedShardId)); // lease for only one child exists childShardIds.add(childShardId1); childShardIds.add(childShardId2); - leaseManager.createLeaseIfNotExists(leaseForClosedShard); - leaseManager.createLeaseIfNotExists(childLease1); + dynamoDBLeaseManager.createLeaseIfNotExists(leaseForClosedShard); + dynamoDBLeaseManager.createLeaseIfNotExists(childLease1); trackedLeases.add(childLease1); trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); - ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - assertNotNull(leaseManager.getLease(closedShardId)); + ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager); + assertNotNull(dynamoDBLeaseManager.getLease(closedShardId)); // leases for both children exists, but they are both at TRIM_HORIZON - leaseManager.createLeaseIfNotExists(childLease2); + dynamoDBLeaseManager.createLeaseIfNotExists(childLease2); trackedLeases.add(childLease2); trackedLeaseMap = ShardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); - ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - assertNotNull(leaseManager.getLease(closedShardId)); + ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager); + assertNotNull(dynamoDBLeaseManager.getLease(closedShardId)); // leases for both children exists, one is at TRIM_HORIZON childLease1.setCheckpoint(new ExtendedSequenceNumber("34890")); - leaseManager.updateLease(childLease1); - ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - assertNotNull(leaseManager.getLease(closedShardId)); + dynamoDBLeaseManager.updateLease(childLease1); + ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager); + assertNotNull(dynamoDBLeaseManager.getLease(closedShardId)); // leases for both children exists, NONE of them are at TRIM_HORIZON childLease2.setCheckpoint(new ExtendedSequenceNumber("43789")); - leaseManager.updateLease(childLease2); - ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - assertNull(leaseManager.getLease(closedShardId)); + dynamoDBLeaseManager.updateLease(childLease2); + ShardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, dynamoDBLeaseManager); + assertNull(dynamoDBLeaseManager.getLease(closedShardId)); } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/TestHarnessBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/TestHarnessBuilder.java index fe56f509..8e4a45cd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/TestHarnessBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/TestHarnessBuilder.java @@ -33,7 +33,7 @@ public class TestHarnessBuilder { private long currentTimeNanos; private Map leases = new HashMap(); - private KinesisClientLeaseManager leaseManager; + private KinesisClientDynamoDBLeaseManager leaseManager; private Map originalLeases = new HashMap<>(); private Callable timeProvider = new Callable() { @@ -45,7 +45,7 @@ public class TestHarnessBuilder { }; - public TestHarnessBuilder(KinesisClientLeaseManager leaseManager) { + public TestHarnessBuilder(KinesisClientDynamoDBLeaseManager leaseManager) { this.leaseManager = leaseManager; } @@ -134,7 +134,7 @@ public class TestHarnessBuilder { Assert.assertEquals(original, actual); // Assert the contents of the lease } - public void addLeasesToRenew(ILeaseRenewer renewer, String... shardIds) + public void addLeasesToRenew(LeaseRenewer renewer, String... shardIds) throws DependencyException, InvalidStateException { List leasesToRenew = new ArrayList(); @@ -147,7 +147,7 @@ public class TestHarnessBuilder { renewer.addLeasesToRenew(leasesToRenew); } - public Map renewMutateAssert(ILeaseRenewer renewer, String... renewedShardIds) + public Map renewMutateAssert(LeaseRenewer renewer, String... renewedShardIds) throws DependencyException, InvalidStateException { renewer.renewLeases(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java index 0a984bdb..c64d9940 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java @@ -26,7 +26,7 @@ import java.util.List; import org.junit.Before; import org.junit.Test; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -58,7 +58,7 @@ public class BlockOnParentShardTaskTest { @Test public final void testCallNoParents() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - ILeaseManager leaseManager = mock(ILeaseManager.class); + LeaseManager leaseManager = mock(LeaseManager.class); when(leaseManager.getLease(shardId)).thenReturn(null); BlockOnParentShardTask task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); @@ -88,7 +88,7 @@ public class BlockOnParentShardTaskTest { KinesisClientLease parent2Lease = new KinesisClientLease(); parent2Lease.setCheckpoint(ExtendedSequenceNumber.SHARD_END); - ILeaseManager leaseManager = mock(ILeaseManager.class); + LeaseManager leaseManager = mock(LeaseManager.class); when(leaseManager.getLease(parent1ShardId)).thenReturn(parent1Lease); when(leaseManager.getLease(parent2ShardId)).thenReturn(parent2Lease); @@ -130,7 +130,7 @@ public class BlockOnParentShardTaskTest { // mock a sequence number checkpoint parent2Lease.setCheckpoint(new ExtendedSequenceNumber("98182584034")); - ILeaseManager leaseManager = mock(ILeaseManager.class); + LeaseManager leaseManager = mock(LeaseManager.class); when(leaseManager.getLease(parent1ShardId)).thenReturn(parent1Lease); when(leaseManager.getLease(parent2ShardId)).thenReturn(parent2Lease); @@ -166,7 +166,7 @@ public class BlockOnParentShardTaskTest { ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); TaskResult result = null; KinesisClientLease parentLease = new KinesisClientLease(); - ILeaseManager leaseManager = mock(ILeaseManager.class); + LeaseManager leaseManager = mock(LeaseManager.class); when(leaseManager.getLease(parentShardId)).thenReturn(parentLease); // test when parent shard has not yet been fully processed diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 286daae6..d2509299 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -45,7 +45,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.ShardInfo; @@ -72,7 +72,7 @@ public class ConsumerStatesTest { @Mock private ShardInfo shardInfo; @Mock - private ILeaseManager leaseManager; + private LeaseManager leaseManager; @Mock private Checkpointer checkpoint; @Mock @@ -113,7 +113,7 @@ public class ConsumerStatesTest { when(shardInfo.shardId()).thenReturn("shardId-000000000000"); } - private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class) ILeaseManager.class; + private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class) LeaseManager.class; @Test public void blockOnParentStateTest() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 561657f6..7f340a1f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -75,7 +75,7 @@ import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.ShardInfo; @@ -139,7 +139,7 @@ public class ShardConsumerTest { @Mock private KinesisClientLibConfiguration config; @Mock - private ILeaseManager leaseManager; + private LeaseManager leaseManager; @Mock private Checkpointer checkpoint; @Mock diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index d552564c..62e38eb5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -34,7 +34,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import software.amazon.kinesis.checkpoint.RecordProcessorCheckpointer; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.LeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.LeaseManagerProxy; import software.amazon.kinesis.leases.ShardInfo; @@ -66,7 +66,7 @@ public class ShutdownTaskTest { @Mock private RecordProcessorCheckpointer checkpointer; @Mock - private ILeaseManager leaseManager; + private LeaseManager leaseManager; @Mock private LeaseManagerProxy leaseManagerProxy; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java index 35313fcc..4e63d104 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -33,6 +34,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -87,6 +90,11 @@ public class PrefetchGetRecordsCacheIntegrationTest { dataFetcher = spy(new KinesisDataFetcherForTest(amazonKinesis, streamName, shardId, MAX_RECORDS_PER_CALL)); getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); executorService = spy(Executors.newFixedThreadPool(1)); + + when(extendedSequenceNumber.getSequenceNumber()).thenReturn("LATEST"); + when(amazonKinesis.getShardIterator(any(GetShardIteratorRequest.class))) + .thenReturn(new GetShardIteratorResult().withShardIterator("TestIterator")); + getRecordsCache = new PrefetchGetRecordsCache(MAX_SIZE, MAX_BYTE_SIZE, MAX_RECORDS_COUNT,