Allow DynamoDB table name to be specified

This commit is contained in:
Sam Day 2016-03-18 22:19:37 +10:00 committed by Sam Day
parent 74c259ca11
commit 85962d70c8
2 changed files with 20 additions and 2 deletions

View file

@ -155,6 +155,7 @@ public class KinesisClientLibConfiguration {
private String applicationName;
private String tableName;
private String streamName;
private String kinesisEndpoint;
private InitialPositionInStream initialPositionInStream;
@ -300,6 +301,7 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsRegionNameValid(regionName);
this.applicationName = applicationName;
this.tableName = applicationName;
this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint;
this.initialPositionInStream = initialPositionInStream;
@ -366,6 +368,13 @@ public class KinesisClientLibConfiguration {
return applicationName;
}
/**
* @return Name of the table to use in DynamoDB
*/
public String getTableName() {
return tableName;
}
/**
* @return Time within which a worker should renew a lease (else it is assumed dead)
*/
@ -572,6 +581,15 @@ public class KinesisClientLibConfiguration {
}
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/**
* @param tableName name of the lease table in DynamoDB
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withTableName(String tableName) {
this.tableName = tableName;
return this;
}
/**
* @param kinesisEndpoint Kinesis endpoint
* @return KinesisClientLibConfiguration

View file

@ -219,7 +219,7 @@ public class Worker implements Runnable {
config.shouldCleanupLeasesUponShardCompletion(),
null,
new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient),
new KinesisClientLeaseManager(config.getTableName(), config.getApplicationName(), dynamoDBClient),
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
@ -952,7 +952,7 @@ public class Worker implements Runnable {
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
null,
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getApplicationName(),
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(),
dynamoDBClient),
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),