Adding DDB BillingMode Support (#656)

* Adding DDB BillingMode Support
This commit is contained in:
Cory-Bradshaw 2019-11-29 14:27:17 -08:00 committed by GitHub
parent 635a101ab4
commit f369f2114a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 156 additions and 44 deletions

View file

@ -18,6 +18,7 @@ import java.util.Date;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
@ -41,7 +42,10 @@ public class KinesisClientLibConfiguration {
* when the application starts for the first time and there is no checkpoint for the shard. * when the application starts for the first time and there is no checkpoint for the shard.
*/ */
public static final InitialPositionInStream DEFAULT_INITIAL_POSITION_IN_STREAM = InitialPositionInStream.LATEST; public static final InitialPositionInStream DEFAULT_INITIAL_POSITION_IN_STREAM = InitialPositionInStream.LATEST;
/**
* Default Billing mode for DDB when we need to create a new lease table. Default value is Provisioned which requires the customer to manage the IOPS on the lease table.
*/
public static final BillingMode DEFAULT_DDB_BILLING_MODE = BillingMode.PROVISIONED;
/** /**
* Fail over time in milliseconds. A worker which does not renew it's lease within this time interval * Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
* will be regarded as having problems and it's shards will be assigned to other workers. * will be regarded as having problems and it's shards will be assigned to other workers.
@ -196,6 +200,8 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50; public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50;
@Getter
private BillingMode billingMode;
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
@ -319,7 +325,7 @@ public class KinesisClientLibConfiguration {
DEFAULT_METRICS_MAX_QUEUE_SIZE, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
null, null,
DEFAULT_SHUTDOWN_GRACE_MILLIS); DEFAULT_SHUTDOWN_GRACE_MILLIS, DEFAULT_DDB_BILLING_MODE);
} }
/** /**
@ -355,6 +361,7 @@ public class KinesisClientLibConfiguration {
* {@link RecordProcessorCheckpointer#checkpoint(String)} * {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service * @param regionName The region name for the service
* @param shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully * @param shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully
* @param billingMode The DDB Billing mode to set for lease table creation.
*/ */
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@ -381,7 +388,7 @@ public class KinesisClientLibConfiguration {
int metricsMaxQueueSize, int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
String regionName, String regionName,
long shutdownGraceMillis) { long shutdownGraceMillis, BillingMode billingMode) {
this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis, maxRecords, idleTimeBetweenReadsInMillis,
@ -389,7 +396,7 @@ public class KinesisClientLibConfiguration {
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, billingMode);
} }
/** /**
@ -425,6 +432,7 @@ public class KinesisClientLibConfiguration {
* with a call to Amazon Kinesis before checkpointing for calls to * with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)} * {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service * @param regionName The region name for the service
* @param billingMode The DDB Billing mode to set for lease table creation.
*/ */
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@ -452,7 +460,8 @@ public class KinesisClientLibConfiguration {
int metricsMaxQueueSize, int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
String regionName, String regionName,
long shutdownGraceMillis) { long shutdownGraceMillis,
BillingMode billingMode) {
// Check following values are greater than zero // Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -500,6 +509,7 @@ public class KinesisClientLibConfiguration {
this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE; this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(); this.recordsFetcherFactory = new SimpleRecordsFetcherFactory();
this.billingMode = billingMode;
} }
/** /**
@ -1154,6 +1164,15 @@ public class KinesisClientLibConfiguration {
return this; return this;
} }
/**
* The DDB Billing mode to set for lease table creation.
* @param billingMode - Either PAY_PER_REQUEST, or PROVISIONED; Defaults to PROVISIONED
* @return
*/
public KinesisClientLibConfiguration withBillingMode(BillingMode billingMode){
this.billingMode = billingMode == null ? DEFAULT_DDB_BILLING_MODE : billingMode;
return this;
}
/** /**
* Sets metrics level that should be enabled. Possible values are: * Sets metrics level that should be enabled. Possible values are:
* NONE * NONE

View file

@ -576,7 +576,7 @@ public class Worker implements Runnable {
private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config, private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config,
AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) { AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) {
return new KinesisClientLibLeaseCoordinator( return new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), DEFAULT_LEASE_SELECTOR, new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()), DEFAULT_LEASE_SELECTOR,
config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(),
config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(), metricsFactory); config.getMaxLeaseRenewalThreads(), metricsFactory);
@ -1345,7 +1345,7 @@ public class Worker implements Runnable {
} }
if (leaseManager == null) { if (leaseManager == null) {
leaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); leaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
} }
if (shardPrioritization == null) { if (shardPrioritization == null) {

View file

@ -14,6 +14,7 @@
*/ */
package com.amazonaws.services.kinesis.leases.impl; package com.amazonaws.services.kinesis.leases.impl;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,8 +39,8 @@ public class KinesisClientLeaseManager extends LeaseManager<KinesisClientLease>
* @param table Leases table * @param table Leases table
* @param dynamoDBClient DynamoDB client to use * @param dynamoDBClient DynamoDB client to use
*/ */
public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient) { public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient, BillingMode billingMode) {
this(table, dynamoDBClient, false); this(table, dynamoDBClient, false, billingMode);
} }
/** /**
@ -50,8 +51,8 @@ public class KinesisClientLeaseManager extends LeaseManager<KinesisClientLease>
* @param dynamoDBClient DynamoDB client to use * @param dynamoDBClient DynamoDB client to use
* @param consistentReads true if we want consistent reads for testing purposes. * @param consistentReads true if we want consistent reads for testing purposes.
*/ */
public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient, boolean consistentReads) { public KinesisClientLeaseManager(String table, AmazonDynamoDB dynamoDBClient, boolean consistentReads, BillingMode billingMode) {
super(table, dynamoDBClient, new KinesisClientLeaseSerializer(), consistentReads); super(table, dynamoDBClient, new KinesisClientLeaseSerializer(), consistentReads, billingMode);
} }
/** /**

View file

@ -19,6 +19,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.kinesis.leases.util.DynamoUtils; import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -61,16 +62,18 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
protected AmazonDynamoDB dynamoDBClient; protected AmazonDynamoDB dynamoDBClient;
protected ILeaseSerializer<T> serializer; protected ILeaseSerializer<T> serializer;
protected boolean consistentReads; protected boolean consistentReads;
private BillingMode billingMode;
/** /**
* Constructor. * Constructor.
* *
* @param table leases table * @param table leases table
* @param dynamoDBClient DynamoDB client to use * @param dynamoDBClient DynamoDB client to use
* @param serializer LeaseSerializer to use to convert to/from DynamoDB objects. * @param serializer LeaseSerializer to use to convert to/from DynamoDB objects.
* @param billingMode The DDB Billing mode to set for lease table creation.
*/ */
public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer<T> serializer) { public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer<T> serializer, BillingMode billingMode) {
this(table, dynamoDBClient, serializer, false); this(table, dynamoDBClient, serializer, false, billingMode);
} }
/** /**
@ -78,13 +81,14 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
* - our code is meant to be resilient to inconsistent reads. Using consistent reads during testing speeds up * - our code is meant to be resilient to inconsistent reads. Using consistent reads during testing speeds up
* execution of simple tests (you don't have to wait out the consistency window). Test cases that want to experience * execution of simple tests (you don't have to wait out the consistency window). Test cases that want to experience
* eventual consistency should not set consistentReads=true. * eventual consistency should not set consistentReads=true.
* *
* @param table leases table * @param table leases table
* @param dynamoDBClient DynamoDB client to use * @param dynamoDBClient DynamoDB client to use
* @param serializer lease serializer to use * @param serializer lease serializer to use
* @param consistentReads true if we want consistent reads for testing purposes. * @param consistentReads true if we want consistent reads for testing purposes.
* @param billingMode The DDB Billing mode to set for lease table creation.
*/ */
public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer<T> serializer, boolean consistentReads) { public LeaseManager(String table, AmazonDynamoDB dynamoDBClient, ILeaseSerializer<T> serializer, boolean consistentReads, BillingMode billingMode) {
verifyNotNull(table, "Table name cannot be null"); verifyNotNull(table, "Table name cannot be null");
verifyNotNull(dynamoDBClient, "dynamoDBClient cannot be null"); verifyNotNull(dynamoDBClient, "dynamoDBClient cannot be null");
verifyNotNull(serializer, "ILeaseSerializer cannot be null"); verifyNotNull(serializer, "ILeaseSerializer cannot be null");
@ -93,6 +97,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
this.dynamoDBClient = dynamoDBClient; this.dynamoDBClient = dynamoDBClient;
this.consistentReads = consistentReads; this.consistentReads = consistentReads;
this.serializer = serializer; this.serializer = serializer;
this.billingMode=billingMode;
} }
/** /**
@ -118,11 +123,13 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
request.setTableName(table); request.setTableName(table);
request.setKeySchema(serializer.getKeySchema()); request.setKeySchema(serializer.getKeySchema());
request.setAttributeDefinitions(serializer.getAttributeDefinitions()); request.setAttributeDefinitions(serializer.getAttributeDefinitions());
request.setBillingMode(billingMode.name());
ProvisionedThroughput throughput = new ProvisionedThroughput(); if(BillingMode.PROVISIONED.equals(billingMode)){
throughput.setReadCapacityUnits(readCapacity); ProvisionedThroughput throughput = new ProvisionedThroughput();
throughput.setWriteCapacityUnits(writeCapacity); throughput.setReadCapacityUnits(readCapacity);
request.setProvisionedThroughput(throughput); throughput.setWriteCapacityUnits(writeCapacity);
request.setProvisionedThroughput(throughput);
}
try { try {
dynamoDBClient.createTable(request); dynamoDBClient.createTable(request);

View file

@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
import java.util.Date; import java.util.Date;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -86,7 +87,7 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_INT, TEST_VALUE_INT,
skipCheckpointValidationValue, skipCheckpointValidationValue,
null, null,
TEST_VALUE_LONG); TEST_VALUE_LONG, BillingMode.PROVISIONED);
} }
@Test @Test
@ -126,7 +127,7 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_INT, TEST_VALUE_INT,
skipCheckpointValidationValue, skipCheckpointValidationValue,
null, null,
longValues[6]); longValues[6], BillingMode.PROVISIONED);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());
} }
@ -161,7 +162,7 @@ public class KinesisClientLibConfigurationTest {
intValues[1], intValues[1],
skipCheckpointValidationValue, skipCheckpointValidationValue,
null, null,
TEST_VALUE_LONG); TEST_VALUE_LONG, BillingMode.PAY_PER_REQUEST);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());
} }

View file

@ -61,7 +61,8 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
if (leaseManager == null) { if (leaseManager == null) {
AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain());
leaseManager = leaseManager =
new KinesisClientLeaseManager(TABLE_NAME, ddb, useConsistentReads); new KinesisClientLeaseManager(TABLE_NAME, ddb, useConsistentReads,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);
} }
leaseManager.createLeaseTableIfNotExists(10L, 10L); leaseManager.createLeaseTableIfNotExists(10L, 10L);
leaseManager.deleteAll(); leaseManager.deleteAll();

View file

@ -18,10 +18,12 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.junit.After; import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.ListTablesResult;
import org.joda.time.DateTime;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -41,6 +43,8 @@ import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.IKinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.IKinesisClientLeaseManager;
import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.StreamStatus;
import static junit.framework.TestCase.fail;
/** /**
* WARN: to run this integration test you'll have to provide a AwsCredentials.properties file on the classpath. * WARN: to run this integration test you'll have to provide a AwsCredentials.properties file on the classpath.
*/ */
@ -84,13 +88,13 @@ public class ShardSyncTaskIntegrationTest {
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
*/ */
@Before public void setUp(BillingMode billingMode, String tableName) throws Exception {
public void setUp() throws Exception {
boolean useConsistentReads = true; boolean useConsistentReads = true;
leaseManager = leaseManager =
new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest", new KinesisClientLeaseManager(tableName,
new AmazonDynamoDBClient(credentialsProvider), new AmazonDynamoDBClient(credentialsProvider),
useConsistentReads); useConsistentReads,
billingMode);
kinesisProxy = kinesisProxy =
new KinesisProxy(STREAM_NAME, new KinesisProxy(STREAM_NAME,
@ -99,25 +103,79 @@ public class ShardSyncTaskIntegrationTest {
} }
/** /**
* @throws java.lang.Exception * Test method for call().
*
* @throws Exception
*/ */
@After @Test
public void tearDown() throws Exception { public final void testCall_ProvisionedDDB() throws Exception {
BillingMode billingMode = BillingMode.PROVISIONED;
String tableName = "ShardSyncTaskIntegrationTest" + billingMode.name();
try {
setUp(billingMode, tableName);
runTest();
checkBillingMode(billingMode, tableName);
}
finally {
cleanUpTable(tableName);
}
} }
/** /**
* Test method for call(). * Test method for call().
* *
* @throws DependencyException * @throws Exception
* @throws InvalidStateException
* @throws ProvisionedThroughputException
*/ */
@Test @Test
public final void testCall() throws DependencyException, InvalidStateException, ProvisionedThroughputException { public final void testCall_PayPerRequestDDB() throws Exception {
BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
String tableName = "ShardSyncTaskIntegrationTest" + billingMode.name();
try {
setUp(billingMode, tableName);
runTest();
checkBillingMode(billingMode, tableName);
} finally {
cleanUpTable(tableName);
}
}
private void cleanUpTable(String tableName) throws DependencyException {
AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance());
ListTablesResult tables = client.listTables();
if(tables.getTableNames().contains(tableName)){
leaseManager.waitUntilLeaseTableExists(2,20);
client.deleteTable(tableName);
DateTime endTime = DateTime.now().plusSeconds(30);
while(client.listTables().getTableNames().contains(tableName)){
if( endTime.isBeforeNow()){
fail("Could not clean up DDB tables in time. Please retry. If these failures continue increase the endTime.");
}
try {
Thread.sleep(333L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private void checkBillingMode(BillingMode billingMode, String tableName) {
AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance());
DescribeTableResult tableDetails = client.describeTable(tableName);
if(BillingMode.PAY_PER_REQUEST.equals(billingMode)) {
Assert.assertEquals(tableDetails.getTable().getBillingModeSummary().getBillingMode(), billingMode.name());
}else{
Assert.assertTrue(tableDetails.getTable().getProvisionedThroughput().getWriteCapacityUnits() == 10);
Assert.assertTrue(tableDetails.getTable().getProvisionedThroughput().getReadCapacityUnits() == 10);
}
}
public void runTest() throws DependencyException, ProvisionedThroughputException, InvalidStateException {
if (!leaseManager.leaseTableExists()) { if (!leaseManager.leaseTableExists()) {
final Long readCapacity = 10L; final Long readCapacity = 10L;
final Long writeCapacity = 10L; final Long writeCapacity = 10L;
leaseManager.createLeaseTableIfNotExists(readCapacity, writeCapacity); leaseManager.createLeaseTableIfNotExists(readCapacity, writeCapacity);
leaseManager.waitUntilLeaseTableExists(2,20);
} }
leaseManager.deleteAll(); leaseManager.deleteAll();
Set<String> shardIds = kinesisProxy.getAllShardIds(); Set<String> shardIds = kinesisProxy.getAllShardIds();

View file

@ -70,7 +70,7 @@ public class ShardSyncerTest {
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L)); InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L));
private final boolean cleanupLeasesOfCompletedShards = true; private final boolean cleanupLeasesOfCompletedShards = true;
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient); LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);
private static final int EXPONENT = 128; private static final int EXPONENT = 128;
protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator(); protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator();
private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator); private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator);

View file

@ -2133,7 +2133,8 @@ public class WorkerTest {
final long idleTimeInMilliseconds = 2L; final long idleTimeInMilliseconds = 2L;
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("foo", ddbClient); LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("foo", ddbClient,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);
leaseManager.createLeaseTableIfNotExists(1L, 1L); leaseManager.createLeaseTableIfNotExists(1L, 1L);
for (KinesisClientLease initialLease : initialLeases) { for (KinesisClientLease initialLease : initialLeases) {
leaseManager.createLeaseIfNotExists(initialLease); leaseManager.createLeaseIfNotExists(initialLease);

View file

@ -28,6 +28,7 @@ import java.util.Map;
import javax.swing.*; import javax.swing.*;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -59,7 +60,8 @@ public class LeaseCoordinatorExerciser {
new DefaultAWSCredentialsProviderChain(); new DefaultAWSCredentialsProviderChain();
AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(creds); AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(creds);
ILeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddb); ILeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddb,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);
if (leaseManager.createLeaseTableIfNotExists(10L, 50L)) { if (leaseManager.createLeaseTableIfNotExists(10L, 50L)) {
LOG.info("Waiting for newly created lease table"); LOG.info("Waiting for newly created lease table");

View file

@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.leases.impl;
import java.util.logging.Logger; import java.util.logging.Logger;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -46,7 +47,8 @@ public class LeaseIntegrationTest {
if (leaseManager == null) { if (leaseManager == null) {
// Do some static setup once per class. // Do some static setup once per class.
leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true); leaseManager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);
MetricsHelper.startScope(new NullMetricsFactory()); MetricsHelper.startScope(new NullMetricsFactory());
} }

View file

@ -18,6 +18,8 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import junit.framework.Assert; import junit.framework.Assert;
import org.junit.Test; import org.junit.Test;
@ -233,7 +235,24 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest {
@Test @Test
public void testWaitUntilLeaseTableExists() throws LeasingException { public void testWaitUntilLeaseTableExists() throws LeasingException {
KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true) { KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nagl_ShardProgress", ddbClient, true,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE) {
@Override
long sleep(long timeToSleepMillis) {
Assert.fail("Should not sleep");
return 0L;
}
};
Assert.assertTrue(manager.waitUntilLeaseTableExists(1, 1));
}
@Test
public void testWaitUntilLeaseTableExistsPayPerRequest() throws LeasingException {
KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nagl_ShardProgress_PayPerRequest", ddbClient, true,
BillingMode.PAY_PER_REQUEST) {
@Override @Override
long sleep(long timeToSleepMillis) { long sleep(long timeToSleepMillis) {
@ -252,7 +271,8 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest {
* Just using AtomicInteger for the indirection it provides. * Just using AtomicInteger for the indirection it provides.
*/ */
final AtomicInteger sleepCounter = new AtomicInteger(0); final AtomicInteger sleepCounter = new AtomicInteger(0);
KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nonexistentTable", ddbClient, true) { KinesisClientLeaseManager manager = new KinesisClientLeaseManager("nonexistentTable", ddbClient, true,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE) {
@Override @Override
long sleep(long timeToSleepMillis) { long sleep(long timeToSleepMillis) {