From f369f2114a1d74453677a7ac1c5f899148bcfff0 Mon Sep 17 00:00:00 2001 From: Cory-Bradshaw <44377622+Cory-Bradshaw@users.noreply.github.com> Date: Fri, 29 Nov 2019 14:27:17 -0800 Subject: [PATCH] Adding DDB BillingMode Support (#656) * Adding DDB BillingMode Support --- .../worker/KinesisClientLibConfiguration.java | 29 +++++-- .../clientlibrary/lib/worker/Worker.java | 4 +- .../impl/KinesisClientLeaseManager.java | 9 +- .../kinesis/leases/impl/LeaseManager.java | 27 +++--- .../KinesisClientLibConfigurationTest.java | 7 +- ...entLibLeaseCoordinatorIntegrationTest.java | 3 +- .../worker/ShardSyncTaskIntegrationTest.java | 84 ++++++++++++++++--- .../lib/worker/ShardSyncerTest.java | 2 +- .../clientlibrary/lib/worker/WorkerTest.java | 3 +- .../impl/LeaseCoordinatorExerciser.java | 4 +- .../leases/impl/LeaseIntegrationTest.java | 4 +- .../impl/LeaseManagerIntegrationTest.java | 24 +++++- 12 files changed, 156 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index dad34563..7ee2d449 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -18,6 +18,7 @@ import java.util.Date; import java.util.Optional; import java.util.Set; +import com.amazonaws.services.dynamodbv2.model.BillingMode; import org.apache.commons.lang3.Validate; import com.amazonaws.ClientConfiguration; @@ -41,7 +42,10 @@ public class KinesisClientLibConfiguration { * when the application starts for the first time and there is no checkpoint for the shard. */ public static final InitialPositionInStream DEFAULT_INITIAL_POSITION_IN_STREAM = InitialPositionInStream.LATEST; - + /** + * Default Billing mode for DDB when we need to create a new lease table. Default value is Provisioned which requires the customer to manage the IOPS on the lease table. + */ + public static final BillingMode DEFAULT_DDB_BILLING_MODE = BillingMode.PROVISIONED; /** * Fail over time in milliseconds. A worker which does not renew it's lease within this time interval * will be regarded as having problems and it's shards will be assigned to other workers. @@ -196,6 +200,8 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50; + @Getter + private BillingMode billingMode; private String applicationName; private String tableName; private String streamName; @@ -319,7 +325,7 @@ public class KinesisClientLibConfiguration { DEFAULT_METRICS_MAX_QUEUE_SIZE, DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, - DEFAULT_SHUTDOWN_GRACE_MILLIS); + DEFAULT_SHUTDOWN_GRACE_MILLIS, DEFAULT_DDB_BILLING_MODE); } /** @@ -355,6 +361,7 @@ public class KinesisClientLibConfiguration { * {@link RecordProcessorCheckpointer#checkpoint(String)} * @param regionName The region name for the service * @param shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully + * @param billingMode The DDB Billing mode to set for lease table creation. */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES @@ -381,7 +388,7 @@ public class KinesisClientLibConfiguration { int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName, - long shutdownGraceMillis) { + long shutdownGraceMillis, BillingMode billingMode) { this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, @@ -389,7 +396,7 @@ public class KinesisClientLibConfiguration { shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, - validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); + validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, billingMode); } /** @@ -425,6 +432,7 @@ public class KinesisClientLibConfiguration { * with a call to Amazon Kinesis before checkpointing for calls to * {@link RecordProcessorCheckpointer#checkpoint(String)} * @param regionName The region name for the service + * @param billingMode The DDB Billing mode to set for lease table creation. */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES @@ -452,7 +460,8 @@ public class KinesisClientLibConfiguration { int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName, - long shutdownGraceMillis) { + long shutdownGraceMillis, + BillingMode billingMode) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -500,6 +509,7 @@ public class KinesisClientLibConfiguration { this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + this.billingMode = billingMode; } /** @@ -1154,6 +1164,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * The DDB Billing mode to set for lease table creation. + * @param billingMode - Either PAY_PER_REQUEST, or PROVISIONED; Defaults to PROVISIONED + * @return + */ + public KinesisClientLibConfiguration withBillingMode(BillingMode billingMode){ + this.billingMode = billingMode == null ? DEFAULT_DDB_BILLING_MODE : billingMode; + return this; + } /** * Sets metrics level that should be enabled. Possible values are: * NONE diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 185113fc..9eabcffe 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -576,7 +576,7 @@ public class Worker implements Runnable { private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) { return new KinesisClientLibLeaseCoordinator( - new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), DEFAULT_LEASE_SELECTOR, + new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()), DEFAULT_LEASE_SELECTOR, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), config.getMaxLeaseRenewalThreads(), metricsFactory); @@ -1345,7 +1345,7 @@ public class Worker implements Runnable { } if (leaseManager == null) { - leaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + leaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()); } if (shardPrioritization == null) { diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseManager.java index 78d4c0d0..b19f8471 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseManager.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.leases.impl; +import com.amazonaws.services.dynamodbv2.model.BillingMode; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,8 +39,8 @@ public class KinesisClientLeaseManager extends LeaseManager * @param table Leases table * @param dynamoDBClient DynamoDB client to use */ - public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient) { - this(table, dynamoDBClient, false); + public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient, BillingMode billingMode) { + this(table, dynamoDBClient, false, billingMode); } /** @@ -50,8 +51,8 @@ public class KinesisClientLeaseManager extends LeaseManager * @param dynamoDBClient DynamoDB client to use * @param consistentReads true if we want consistent reads for testing purposes. */ - public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient, boolean consistentReads) { - super(table, dynamoDBClient, new KinesisClientLeaseSerializer(), consistentReads); + public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient, boolean consistentReads, BillingMode billingMode) { + super(table, dynamoDBClient, new KinesisClientLeaseSerializer(), consistentReads, billingMode); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index c27ca0ab..1c70e307 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.dynamodbv2.model.BillingMode; import com.amazonaws.services.kinesis.leases.util.DynamoUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,16 +62,18 @@ public class LeaseManager implements ILeaseManager { protected AmazonDynamoDB dynamoDBClient; protected ILeaseSerializer serializer; protected boolean consistentReads; + private BillingMode billingMode; /** * Constructor. - * + * * @param table leases table * @param dynamoDBClient DynamoDB client to use * @param serializer LeaseSerializer to use to convert to/from DynamoDB objects. + * @param billingMode The DDB Billing mode to set for lease table creation. */ - public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer serializer) { - this(table, dynamoDBClient, serializer, false); + public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer serializer, BillingMode billingMode) { + this(table, dynamoDBClient, serializer, false, billingMode); } /** @@ -78,13 +81,14 @@ public class LeaseManager implements ILeaseManager { * - our code is meant to be resilient to inconsistent reads. Using consistent reads during testing speeds up * execution of simple tests (you don't have to wait out the consistency window). Test cases that want to experience * eventual consistency should not set consistentReads=true. - * + * * @param table leases table * @param dynamoDBClient DynamoDB client to use * @param serializer lease serializer to use * @param consistentReads true if we want consistent reads for testing purposes. + * @param billingMode The DDB Billing mode to set for lease table creation. */ - public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer serializer, boolean consistentReads) { + public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer serializer, boolean consistentReads, BillingMode billingMode) { verifyNotNull(table, "Table name cannot be null"); verifyNotNull(dynamoDBClient, "dynamoDBClient cannot be null"); verifyNotNull(serializer, "ILeaseSerializer cannot be null"); @@ -93,6 +97,7 @@ public class LeaseManager implements ILeaseManager { this.dynamoDBClient = dynamoDBClient; this.consistentReads = consistentReads; this.serializer = serializer; + this.billingMode=billingMode; } /** @@ -118,11 +123,13 @@ public class LeaseManager implements ILeaseManager { request.setTableName(table); request.setKeySchema(serializer.getKeySchema()); request.setAttributeDefinitions(serializer.getAttributeDefinitions()); - - ProvisionedThroughput throughput = new ProvisionedThroughput(); - throughput.setReadCapacityUnits(readCapacity); - throughput.setWriteCapacityUnits(writeCapacity); - request.setProvisionedThroughput(throughput); + request.setBillingMode(billingMode.name()); + if(BillingMode.PROVISIONED.equals(billingMode)){ + ProvisionedThroughput throughput = new ProvisionedThroughput(); + throughput.setReadCapacityUnits(readCapacity); + throughput.setWriteCapacityUnits(writeCapacity); + request.setProvisionedThroughput(throughput); + } try { dynamoDBClient.createTable(request); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index ce046c96..f2b5a460 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.fail; import java.util.Date; +import com.amazonaws.services.dynamodbv2.model.BillingMode; import org.junit.Test; import org.mockito.Mockito; @@ -86,7 +87,7 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_INT, skipCheckpointValidationValue, null, - TEST_VALUE_LONG); + TEST_VALUE_LONG, BillingMode.PROVISIONED); } @Test @@ -126,7 +127,7 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_INT, skipCheckpointValidationValue, null, - longValues[6]); + longValues[6], BillingMode.PROVISIONED); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); } @@ -161,7 +162,7 @@ public class KinesisClientLibConfigurationTest { intValues[1], skipCheckpointValidationValue, null, - TEST_VALUE_LONG); + TEST_VALUE_LONG, BillingMode.PAY_PER_REQUEST); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java index d7463f82..aedc627d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java @@ -61,7 +61,8 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest { if (leaseManager == null) { AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); leaseManager = - new KinesisClientLeaseManager(TABLE_NAME, ddb, useConsistentReads); + new KinesisClientLeaseManager(TABLE_NAME, ddb, useConsistentReads, + KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); } leaseManager.createLeaseTableIfNotExists(10L, 10L); leaseManager.deleteAll(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index 04741f45..a4afa053 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -18,10 +18,12 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.junit.After; +import com.amazonaws.services.dynamodbv2.model.BillingMode; +import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; +import com.amazonaws.services.dynamodbv2.model.ListTablesResult; +import org.joda.time.DateTime; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -41,6 +43,8 @@ import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.IKinesisClientLeaseManager; import com.amazonaws.services.kinesis.model.StreamStatus; +import static junit.framework.TestCase.fail; + /** * WARN: to run this integration test you'll have to provide a AwsCredentials.properties file on the classpath. */ @@ -84,13 +88,13 @@ public class ShardSyncTaskIntegrationTest { /** * @throws java.lang.Exception */ - @Before - public void setUp() throws Exception { + public void setUp(BillingMode billingMode, String tableName) throws Exception { boolean useConsistentReads = true; leaseManager = - new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest", + new KinesisClientLeaseManager(tableName, new AmazonDynamoDBClient(credentialsProvider), - useConsistentReads); + useConsistentReads, + billingMode); kinesisProxy = new KinesisProxy(STREAM_NAME, @@ -99,25 +103,79 @@ public class ShardSyncTaskIntegrationTest { } /** - * @throws java.lang.Exception + * Test method for call(). + * + * @throws Exception */ - @After - public void tearDown() throws Exception { + @Test + public final void testCall_ProvisionedDDB() throws Exception { + BillingMode billingMode = BillingMode.PROVISIONED; + String tableName = "ShardSyncTaskIntegrationTest" + billingMode.name(); + try { + setUp(billingMode, tableName); + runTest(); + checkBillingMode(billingMode, tableName); + } + finally { + cleanUpTable(tableName); + } } /** * Test method for call(). * - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException + * @throws Exception */ @Test - public final void testCall() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + public final void testCall_PayPerRequestDDB() throws Exception { + BillingMode billingMode = BillingMode.PAY_PER_REQUEST; + String tableName = "ShardSyncTaskIntegrationTest" + billingMode.name(); + try { + setUp(billingMode, tableName); + runTest(); + checkBillingMode(billingMode, tableName); + } finally { + cleanUpTable(tableName); + } + } + + private void cleanUpTable(String tableName) throws DependencyException { + AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance()); + ListTablesResult tables = client.listTables(); + if(tables.getTableNames().contains(tableName)){ + leaseManager.waitUntilLeaseTableExists(2,20); + client.deleteTable(tableName); + DateTime endTime = DateTime.now().plusSeconds(30); + while(client.listTables().getTableNames().contains(tableName)){ + if( endTime.isBeforeNow()){ + fail("Could not clean up DDB tables in time. Please retry. If these failures continue increase the endTime."); + } + try { + Thread.sleep(333L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + private void checkBillingMode(BillingMode billingMode, String tableName) { + AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance()); + DescribeTableResult tableDetails = client.describeTable(tableName); + if(BillingMode.PAY_PER_REQUEST.equals(billingMode)) { + Assert.assertEquals(tableDetails.getTable().getBillingModeSummary().getBillingMode(), billingMode.name()); + }else{ + Assert.assertTrue(tableDetails.getTable().getProvisionedThroughput().getWriteCapacityUnits() == 10); + Assert.assertTrue(tableDetails.getTable().getProvisionedThroughput().getReadCapacityUnits() == 10); + } + } + + public void runTest() throws DependencyException, ProvisionedThroughputException, InvalidStateException { if (!leaseManager.leaseTableExists()) { final Long readCapacity = 10L; final Long writeCapacity = 10L; leaseManager.createLeaseTableIfNotExists(readCapacity, writeCapacity); + leaseManager.waitUntilLeaseTableExists(2,20); } leaseManager.deleteAll(); Set shardIds = kinesisProxy.getAllShardIds(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index fd34be76..2fce50a7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -70,7 +70,7 @@ public class ShardSyncerTest { InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L)); private final boolean cleanupLeasesOfCompletedShards = true; AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); - LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient); + LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); private static final int EXPONENT = 128; protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator(); private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 438215f1..a2faf607 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -2133,7 +2133,8 @@ public class WorkerTest { final long idleTimeInMilliseconds = 2L; AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); - LeaseManager leaseManager = new KinesisClientLeaseManager("foo", ddbClient); + LeaseManager leaseManager = new KinesisClientLeaseManager("foo", ddbClient, + KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); leaseManager.createLeaseTableIfNotExists(1L, 1L); for (KinesisClientLease initialLease : initialLeases) { leaseManager.createLeaseIfNotExists(initialLease); diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinatorExerciser.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinatorExerciser.java index e49e6d9b..e7a3c5f5 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinatorExerciser.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinatorExerciser.java @@ -28,6 +28,7 @@ import java.util.Map; import javax.swing.*; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -59,7 +60,8 @@ public class LeaseCoordinatorExerciser { new DefaultAWSCredentialsProviderChain(); AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(creds); - ILeaseManager leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddb); + ILeaseManager leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddb, + KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); if (leaseManager.createLeaseTableIfNotExists(10L, 50L)) { LOG.info("Waiting for newly created lease table"); diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java index 5e9c15e6..091de651 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java @@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.leases.impl; import java.util.logging.Logger; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,7 +47,8 @@ public class LeaseIntegrationTest { if (leaseManager == null) { // Do some static setup once per class. - leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true); + leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true, + KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); MetricsHelper.startScope(new NullMetricsFactory()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java index cfc97ef6..cede5be1 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java @@ -18,6 +18,8 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import com.amazonaws.services.dynamodbv2.model.BillingMode; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import junit.framework.Assert; import org.junit.Test; @@ -233,7 +235,24 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { @Test public void testWaitUntilLeaseTableExists() throws LeasingException { - KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true) { + KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true, + KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE) { + + @Override + long sleep(long timeToSleepMillis) { + Assert.fail("Should not sleep"); + return 0L; + } + + }; + + Assert.assertTrue(manager.waitUntilLeaseTableExists(1, 1)); + } + + @Test + public void testWaitUntilLeaseTableExistsPayPerRequest() throws LeasingException { + KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nagl_ShardProgress_PayPerRequest", ddbClient, true, + BillingMode.PAY_PER_REQUEST) { @Override long sleep(long timeToSleepMillis) { @@ -252,7 +271,8 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { * Just using AtomicInteger for the indirection it provides. */ final AtomicInteger sleepCounter = new AtomicInteger(0); - KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nonexistentTable", ddbClient, true) { + KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nonexistentTable", ddbClient, true, + KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE) { @Override long sleep(long timeToSleepMillis) {