Introducing callback for DynamoDB lease table (#413)

* This feature enables customers to perform actions on DynamoDB lease tables once created and in the active state
* Introducing TableCreatorCallback for DynamoDB lease management
* Introducing DoesNothingTableCreatorCallback
* Intoducing TableCreatorCallback config in LeaseManagementConfig, with DoesNothingTableCreatorCallback as the default
* Introducing TableCreatorCallbackInput object.
* Updating the javadoc
This commit is contained in:
Sahil Palvia 2018-09-25 10:06:24 -07:00 committed by GitHub
parent 0d6335d434
commit a88d4ba602
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 248 additions and 16 deletions

View file

@ -31,6 +31,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -218,7 +219,17 @@ public class LeaseManagementConfig {
super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue<>(),
threadFactory);
}
};
}
/**
* Callback used with DynamoDB lease management. Callback is invoked once the table is newly created and is in the
* active status.
*
* <p>
* Default value: {@link TableCreatorCallback#NOOP_TABLE_CREATOR_CALLBACK}
* </p>
*/
private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;
private LeaseManagementFactory leaseManagementFactory;
@ -246,7 +257,8 @@ public class LeaseManagementConfig {
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity());
initialLeaseTableWriteCapacity(),
tableCreatorCallback());
}
return leaseManagementFactory;
}

View file

@ -202,5 +202,4 @@ public interface LeaseRefresher {
*/
ExtendedSequenceNumber getCheckpoint(String shardId)
throws ProvisionedThroughputException, InvalidStateException, DependencyException;
}

View file

@ -66,6 +66,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final int cacheMissWarningModulus;
private final long initialLeaseTableReadCapacity;
private final long initialLeaseTableWriteCapacity;
private final TableCreatorCallback tableCreatorCallback;
/**
* Constructor.
@ -116,6 +117,10 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
/**
* Constructor.
*
* <p>
* NOTE: This constructor is deprecated and will be removed in a future release.
* </p>
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
@ -140,6 +145,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
@ -150,6 +156,54 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
}
/**
* Constructor.
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param tableCreatorCallback
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final TableCreatorCallback tableCreatorCallback) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.dynamoDBClient = dynamoDBClient;
@ -173,6 +227,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.tableCreatorCallback = tableCreatorCallback;
}
@Override
@ -203,7 +258,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads);
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads,
tableCreatorCallback);
}
@Override

View file

@ -20,7 +20,6 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
@ -58,7 +57,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
* An implementation of {@link LeaseRefresher} that uses DynamoDB.
*/
@AllArgsConstructor
@Slf4j
@KinesisClientInternalApi
public class DynamoDBLeaseRefresher implements LeaseRefresher {
@ -66,6 +64,46 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
protected final DynamoDbAsyncClient dynamoDBClient;
protected final LeaseSerializer serializer;
protected final boolean consistentReads;
private final TableCreatorCallback tableCreatorCallback;
private boolean newTableCreated = false;
/**
* Constructor.
*
* <p>
* NOTE: This constructor is deprecated and will be removed in a future release.
* </p>
*
* @param table
* @param dynamoDBClient
* @param serializer
* @param consistentReads
*/
@Deprecated
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads) {
this(table, dynamoDBClient, serializer, consistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
}
/**
* Constructor.
*
* @param table
* @param dynamoDBClient
* @param serializer
* @param consistentReads
* @param tableCreatorCallback
*/
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.serializer = serializer;
this.consistentReads = consistentReads;
this.tableCreatorCallback = tableCreatorCallback;
}
/**
* {@inheritDoc}
@ -75,7 +113,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
throws ProvisionedThroughputException, DependencyException {
try {
if (tableStatus() != null) {
return false;
return newTableCreated;
}
} catch (DependencyException de) {
//
@ -95,6 +133,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
try {
try {
dynamoDBClient.createTable(request).get();
newTableCreated = true;
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@ -102,13 +141,13 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
}
} catch (ResourceInUseException e) {
log.info("Table {} already exists.", table);
return false;
return newTableCreated;
} catch (LimitExceededException e) {
throw new ProvisionedThroughputException("Capacity exceeded when creating table " + table, e);
} catch (DynamoDbException e) {
throw new DependencyException(e);
}
return true;
return newTableCreated;
}
/**
@ -162,6 +201,11 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
sleepTimeRemaining -= sleep(timeToSleepMillis);
}
if (newTableCreated) {
log.debug("Lease table was recently created, will perform post table creation actions");
performPostTableCreationAction();
}
return true;
}
@ -591,7 +635,12 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private AWSExceptionManager createExceptionManager() {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(DynamoDbException.class, t -> (DynamoDbException) t);
exceptionManager.add(DynamoDbException.class, t -> t);
return exceptionManager;
}
void performPostTableCreationAction() {
tableCreatorCallback.performAction(
TableCreatorCallbackInput.builder().dynamoDbClient(dynamoDBClient).tableName(table).build());
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases.dynamodb;
/**
* Callback interface for interacting with the DynamoDB lease table post creation.
*/
@FunctionalInterface
public interface TableCreatorCallback {
/**
* NoOp implemetation for TableCreatorCallback
*/
TableCreatorCallback NOOP_TABLE_CREATOR_CALLBACK = (TableCreatorCallbackInput tableCreatorCallbackInput) -> {
// Do nothing
};
/**
* Actions needed to be performed on the DynamoDB lease table once the table has been created and is in the ACTIVE
* status. Will not be called if the table previously exists.
*
* @param tableCreatorCallbackInput
* Input object for table creator
*/
void performAction(TableCreatorCallbackInput tableCreatorCallbackInput);
}

View file

@ -0,0 +1,39 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases.dynamodb;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.ToString;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
/**
*
*/
@Builder(toBuilder = true)
@Data
@Accessors(fluent = true)
@ToString
@EqualsAndHashCode
public class TableCreatorCallbackInput {
@NonNull
private final DynamoDbAsyncClient dynamoDbClient;
@NonNull
private final String tableName;
}

View file

@ -34,6 +34,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
@ -65,7 +66,7 @@ public class LeaseCoordinatorExerciser {
.credentialsProvider(DefaultCredentialsProvider.create()).build();
LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient,
new DynamoDBLeaseSerializer(), true);
new DynamoDBLeaseSerializer(), true, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY,
INITIAL_LEASE_TABLE_WRITE_CAPACITY)) {

View file

@ -17,12 +17,14 @@ package software.amazon.kinesis.leases;
import org.junit.Rule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.Mock;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
@Slf4j
public class LeaseIntegrationTest {
@ -32,6 +34,11 @@ public class LeaseIntegrationTest {
protected static DynamoDbAsyncClient ddbClient = DynamoDbAsyncClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create()).build();
protected String tableName = "nagl_ShardProgress";
@Mock
protected TableCreatorCallback tableCreatorCallback;
@Rule
public TestWatcher watcher = new TestWatcher() {
@ -40,7 +47,8 @@ public class LeaseIntegrationTest {
if (leaseRefresher == null) {
// Do some static setup once per class.
leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", ddbClient, leaseSerializer, true);
leaseRefresher = new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
tableCreatorCallback);
}
try {

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.leases;
//import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -39,6 +40,7 @@ import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -91,7 +93,7 @@ public class ShardSyncTaskIntegrationTest {
DynamoDbAsyncClient client = DynamoDbAsyncClient.builder().region(Region.US_EAST_1).build();
leaseRefresher =
new DynamoDBLeaseRefresher("ShardSyncTaskIntegrationTest", client, new DynamoDBLeaseSerializer(),
USE_CONSISTENT_READS);
USE_CONSISTENT_READS, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
shardDetector = new KinesisShardDetector(kinesisClient, STREAM_NAME, 500L, 50,
LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS);

View file

@ -73,7 +73,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
DynamoDbAsyncClient dynamoDBClient = DynamoDbAsyncClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create()).build();
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDBClient, new DynamoDBLeaseSerializer(),
useConsistentReads);
useConsistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
}
leaseRefresher.createLeaseTableIfNotExists(10L, 10L);

View file

@ -20,18 +20,32 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseIntegrationTest;
import software.amazon.kinesis.leases.exceptions.LeasingException;
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest {
@Before
public void setup() {
doNothing().when(tableCreatorCallback).performAction(
eq(TableCreatorCallbackInput.builder().dynamoDbClient(ddbClient).tableName(tableName).build()));
}
/**
* Test listLeases when no records are present.
*/
@ -239,7 +253,8 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
@Test
public void testWaitUntilLeaseTableExists() throws LeasingException {
DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", ddbClient, new DynamoDBLeaseSerializer(), true) {
DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", ddbClient,
new DynamoDBLeaseSerializer(), true, tableCreatorCallback) {
@Override
long sleep(long timeToSleepMillis) {
fail("Should not sleep");
@ -257,7 +272,8 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
* Just using AtomicInteger for the indirection it provides.
*/
final AtomicInteger sleepCounter = new AtomicInteger(0);
DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("nonexistentTable", ddbClient, new DynamoDBLeaseSerializer(), true) {
DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("nonexistentTable", ddbClient,
new DynamoDBLeaseSerializer(), true, tableCreatorCallback) {
@Override
long sleep(long timeToSleepMillis) {
assertEquals(1000L, timeToSleepMillis);
@ -270,4 +286,15 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
assertFalse(refresher.waitUntilLeaseTableExists(2, 1));
assertEquals(1, sleepCounter.get());
}
@Test
public void testTableCreatorCallback() throws Exception {
DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher(tableName, ddbClient,
new DynamoDBLeaseSerializer(), true, tableCreatorCallback);
refresher.performPostTableCreationAction();
verify(tableCreatorCallback).performAction(
eq(TableCreatorCallbackInput.builder().dynamoDbClient(ddbClient).tableName(tableName).build()));
}
}