This commit is contained in:
Sam 2016-07-21 13:40:11 +00:00 committed by GitHub
commit 467011f568
2 changed files with 20 additions and 2 deletions

View file

@ -155,6 +155,7 @@ public class KinesisClientLibConfiguration {
private String applicationName; private String applicationName;
private String tableName;
private String streamName; private String streamName;
private String kinesisEndpoint; private String kinesisEndpoint;
private InitialPositionInStream initialPositionInStream; private InitialPositionInStream initialPositionInStream;
@ -300,6 +301,7 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsRegionNameValid(regionName); checkIsRegionNameValid(regionName);
this.applicationName = applicationName; this.applicationName = applicationName;
this.tableName = applicationName;
this.streamName = streamName; this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint; this.kinesisEndpoint = kinesisEndpoint;
this.initialPositionInStream = initialPositionInStream; this.initialPositionInStream = initialPositionInStream;
@ -366,6 +368,13 @@ public class KinesisClientLibConfiguration {
return applicationName; return applicationName;
} }
/**
* @return Name of the table to use in DynamoDB
*/
public String getTableName() {
return tableName;
}
/** /**
* @return Time within which a worker should renew a lease (else it is assumed dead) * @return Time within which a worker should renew a lease (else it is assumed dead)
*/ */
@ -572,6 +581,15 @@ public class KinesisClientLibConfiguration {
} }
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/**
* @param tableName name of the lease table in DynamoDB
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withTableName(String tableName) {
this.tableName = tableName;
return this;
}
/** /**
* @param kinesisEndpoint Kinesis endpoint * @param kinesisEndpoint Kinesis endpoint
* @return KinesisClientLibConfiguration * @return KinesisClientLibConfiguration

View file

@ -219,7 +219,7 @@ public class Worker implements Runnable {
config.shouldCleanupLeasesUponShardCompletion(), config.shouldCleanupLeasesUponShardCompletion(),
null, null,
new KinesisClientLibLeaseCoordinator( new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient), new KinesisClientLeaseManager(config.getTableName(), config.getApplicationName(), dynamoDBClient),
config.getWorkerIdentifier(), config.getWorkerIdentifier(),
config.getFailoverTimeMillis(), config.getFailoverTimeMillis(),
config.getEpsilonMillis(), config.getEpsilonMillis(),
@ -952,7 +952,7 @@ public class Worker implements Runnable {
config.getShardSyncIntervalMillis(), config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(), config.shouldCleanupLeasesUponShardCompletion(),
null, null,
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getApplicationName(), new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(),
dynamoDBClient), dynamoDBClient),
config.getWorkerIdentifier(), config.getWorkerIdentifier(),
config.getFailoverTimeMillis(), config.getFailoverTimeMillis(),