diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index a56a6d5e..d19c583e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; +import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -218,7 +219,17 @@ public class LeaseManagementConfig { super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory); } - }; + } + + /** + * Callback used with DynamoDB lease management. Callback is invoked once the table is newly created and is in the + * active status. + * + *

+ * Default value: {@link TableCreatorCallback#NOOP_TABLE_CREATOR_CALLBACK} + *

+ */ + private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK; private LeaseManagementFactory leaseManagementFactory; @@ -246,7 +257,8 @@ public class LeaseManagementConfig { listShardsCacheAllowedAgeInSeconds(), cacheMissWarningModulus(), initialLeaseTableReadCapacity(), - initialLeaseTableWriteCapacity()); + initialLeaseTableWriteCapacity(), + tableCreatorCallback()); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index f12c5afb..bdf69260 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -202,5 +202,4 @@ public interface LeaseRefresher { */ ExtendedSequenceNumber getCheckpoint(String shardId) throws ProvisionedThroughputException, InvalidStateException, DependencyException; - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 3c20c591..3fe692b2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -66,6 +66,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final int cacheMissWarningModulus; private final long initialLeaseTableReadCapacity; private final long initialLeaseTableWriteCapacity; + private final TableCreatorCallback tableCreatorCallback; /** * Constructor. @@ -116,6 +117,10 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { /** * Constructor. * + *

+ * NOTE: This constructor is deprecated and will be removed in a future release. + *

+ * * @param kinesisClient * @param streamName * @param dynamoDBClient @@ -140,6 +145,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity */ + @Deprecated public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, @@ -150,6 +156,54 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) { + this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, + initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, + TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); + } + + /** + * Constructor. + * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param tableCreatorCallback + */ + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final TableCreatorCallback tableCreatorCallback) { this.kinesisClient = kinesisClient; this.streamName = streamName; this.dynamoDBClient = dynamoDBClient; @@ -173,6 +227,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.cacheMissWarningModulus = cacheMissWarningModulus; this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; + this.tableCreatorCallback = tableCreatorCallback; } @Override @@ -203,7 +258,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @Override public DynamoDBLeaseRefresher createLeaseRefresher() { - return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads); + return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads, + tableCreatorCallback); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index ae687d77..79a12fc3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import lombok.AllArgsConstructor; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -58,7 +57,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * An implementation of {@link LeaseRefresher} that uses DynamoDB. */ -@AllArgsConstructor @Slf4j @KinesisClientInternalApi public class DynamoDBLeaseRefresher implements LeaseRefresher { @@ -66,6 +64,46 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { protected final DynamoDbAsyncClient dynamoDBClient; protected final LeaseSerializer serializer; protected final boolean consistentReads; + private final TableCreatorCallback tableCreatorCallback; + + private boolean newTableCreated = false; + + /** + * Constructor. + * + *

+ * NOTE: This constructor is deprecated and will be removed in a future release. + *

+ * + * @param table + * @param dynamoDBClient + * @param serializer + * @param consistentReads + */ + @Deprecated + public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, + final LeaseSerializer serializer, final boolean consistentReads) { + this(table, dynamoDBClient, serializer, consistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); + } + + /** + * Constructor. + * + * @param table + * @param dynamoDBClient + * @param serializer + * @param consistentReads + * @param tableCreatorCallback + */ + public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, + final LeaseSerializer serializer, final boolean consistentReads, + @NonNull final TableCreatorCallback tableCreatorCallback) { + this.table = table; + this.dynamoDBClient = dynamoDBClient; + this.serializer = serializer; + this.consistentReads = consistentReads; + this.tableCreatorCallback = tableCreatorCallback; + } /** * {@inheritDoc} @@ -75,7 +113,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { throws ProvisionedThroughputException, DependencyException { try { if (tableStatus() != null) { - return false; + return newTableCreated; } } catch (DependencyException de) { // @@ -95,6 +133,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { dynamoDBClient.createTable(request).get(); + newTableCreated = true; } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -102,13 +141,13 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } } catch (ResourceInUseException e) { log.info("Table {} already exists.", table); - return false; + return newTableCreated; } catch (LimitExceededException e) { throw new ProvisionedThroughputException("Capacity exceeded when creating table " + table, e); } catch (DynamoDbException e) { throw new DependencyException(e); } - return true; + return newTableCreated; } /** @@ -162,6 +201,11 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { sleepTimeRemaining -= sleep(timeToSleepMillis); } + if (newTableCreated) { + log.debug("Lease table was recently created, will perform post table creation actions"); + performPostTableCreationAction(); + } + return true; } @@ -591,7 +635,12 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private AWSExceptionManager createExceptionManager() { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); - exceptionManager.add(DynamoDbException.class, t -> (DynamoDbException) t); + exceptionManager.add(DynamoDbException.class, t -> t); return exceptionManager; } + + void performPostTableCreationAction() { + tableCreatorCallback.performAction( + TableCreatorCallbackInput.builder().dynamoDbClient(dynamoDBClient).tableName(table).build()); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableCreatorCallback.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableCreatorCallback.java new file mode 100644 index 00000000..088ba924 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableCreatorCallback.java @@ -0,0 +1,39 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.kinesis.leases.dynamodb; + +/** + * Callback interface for interacting with the DynamoDB lease table post creation. + */ +@FunctionalInterface +public interface TableCreatorCallback { + /** + * NoOp implemetation for TableCreatorCallback + */ + TableCreatorCallback NOOP_TABLE_CREATOR_CALLBACK = (TableCreatorCallbackInput tableCreatorCallbackInput) -> { + // Do nothing + }; + + /** + * Actions needed to be performed on the DynamoDB lease table once the table has been created and is in the ACTIVE + * status. Will not be called if the table previously exists. + * + * @param tableCreatorCallbackInput + * Input object for table creator + */ + void performAction(TableCreatorCallbackInput tableCreatorCallbackInput); + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableCreatorCallbackInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableCreatorCallbackInput.java new file mode 100644 index 00000000..4c4d6f12 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableCreatorCallbackInput.java @@ -0,0 +1,39 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.kinesis.leases.dynamodb; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NonNull; +import lombok.ToString; +import lombok.experimental.Accessors; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; + +/** + * + */ +@Builder(toBuilder = true) +@Data +@Accessors(fluent = true) +@ToString +@EqualsAndHashCode +public class TableCreatorCallbackInput { + @NonNull + private final DynamoDbAsyncClient dynamoDbClient; + @NonNull + private final String tableName; +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java index c3703473..f2829936 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java @@ -34,6 +34,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; +import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; @@ -65,7 +66,7 @@ public class LeaseCoordinatorExerciser { .credentialsProvider(DefaultCredentialsProvider.create()).build(); LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient, - new DynamoDBLeaseSerializer(), true); + new DynamoDBLeaseSerializer(), true, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY)) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java index 90cb90e8..9cc770db 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java @@ -17,12 +17,14 @@ package software.amazon.kinesis.leases; import org.junit.Rule; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.mockito.Mock; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; +import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; @Slf4j public class LeaseIntegrationTest { @@ -32,6 +34,11 @@ public class LeaseIntegrationTest { protected static DynamoDbAsyncClient ddbClient = DynamoDbAsyncClient.builder() .credentialsProvider(DefaultCredentialsProvider.create()).build(); + protected String tableName = "nagl_ShardProgress"; + + @Mock + protected TableCreatorCallback tableCreatorCallback; + @Rule public TestWatcher watcher = new TestWatcher() { @@ -40,7 +47,8 @@ public class LeaseIntegrationTest { if (leaseRefresher == null) { // Do some static setup once per class. - leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", ddbClient, leaseSerializer, true); + leaseRefresher = new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true, + tableCreatorCallback); } try { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java index 19d8f0df..cffb7154 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases; //import java.net.URI; + import java.util.HashSet; import java.util.List; import java.util.Set; @@ -39,6 +40,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; +import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -91,7 +93,7 @@ public class ShardSyncTaskIntegrationTest { DynamoDbAsyncClient client = DynamoDbAsyncClient.builder().region(Region.US_EAST_1).build(); leaseRefresher = new DynamoDBLeaseRefresher("ShardSyncTaskIntegrationTest", client, new DynamoDBLeaseSerializer(), - USE_CONSISTENT_READS); + USE_CONSISTENT_READS, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); shardDetector = new KinesisShardDetector(kinesisClient, STREAM_NAME, 500L, 50, LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index 40543dce..051e8000 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -73,7 +73,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { DynamoDbAsyncClient dynamoDBClient = DynamoDbAsyncClient.builder() .credentialsProvider(DefaultCredentialsProvider.create()).build(); leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDBClient, new DynamoDBLeaseSerializer(), - useConsistentReads); + useConsistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); } leaseRefresher.createLeaseTableIfNotExists(10L, 10L); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java index 23158d97..b94c8305 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java @@ -20,18 +20,32 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseIntegrationTest; import software.amazon.kinesis.leases.exceptions.LeasingException; +@RunWith(MockitoJUnitRunner.class) public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest { + + @Before + public void setup() { + doNothing().when(tableCreatorCallback).performAction( + eq(TableCreatorCallbackInput.builder().dynamoDbClient(ddbClient).tableName(tableName).build())); + } + /** * Test listLeases when no records are present. */ @@ -239,7 +253,8 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest @Test public void testWaitUntilLeaseTableExists() throws LeasingException { - DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", ddbClient, new DynamoDBLeaseSerializer(), true) { + DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", ddbClient, + new DynamoDBLeaseSerializer(), true, tableCreatorCallback) { @Override long sleep(long timeToSleepMillis) { fail("Should not sleep"); @@ -257,7 +272,8 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest * Just using AtomicInteger for the indirection it provides. */ final AtomicInteger sleepCounter = new AtomicInteger(0); - DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("nonexistentTable", ddbClient, new DynamoDBLeaseSerializer(), true) { + DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("nonexistentTable", ddbClient, + new DynamoDBLeaseSerializer(), true, tableCreatorCallback) { @Override long sleep(long timeToSleepMillis) { assertEquals(1000L, timeToSleepMillis); @@ -270,4 +286,15 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest assertFalse(refresher.waitUntilLeaseTableExists(2, 1)); assertEquals(1, sleepCounter.get()); } + + @Test + public void testTableCreatorCallback() throws Exception { + DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher(tableName, ddbClient, + new DynamoDBLeaseSerializer(), true, tableCreatorCallback); + + refresher.performPostTableCreationAction(); + + verify(tableCreatorCallback).performAction( + eq(TableCreatorCallbackInput.builder().dynamoDbClient(ddbClient).tableName(tableName).build())); + } }