Joshua Morris 2016-08-19 23:24:27 +00:00
commit 06b0acd3dc
2 changed files with 26 additions and 1 deletions

View file

@ -158,6 +158,7 @@ public class KinesisClientLibConfiguration {
private String tableName;
private String streamName;
private String kinesisEndpoint;
private String dynamoDBEndpoint;
private InitialPositionInStream initialPositionInStream;
private AWSCredentialsProvider kinesisCredentialsProvider;
private AWSCredentialsProvider dynamoDBCredentialsProvider;
@ -221,7 +222,7 @@ public class KinesisClientLibConfiguration {
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
@ -237,6 +238,7 @@ public class KinesisClientLibConfiguration {
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream
* @param kinesisEndpoint Kinesis endpoint
* @param dynamoDBEndpoint DynamoDB endpoint
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
* records from that location in the stream when an application starts up for the first time and there
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
@ -270,6 +272,7 @@ public class KinesisClientLibConfiguration {
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
String dynamoDBEndpoint,
InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
@ -304,6 +307,7 @@ public class KinesisClientLibConfiguration {
this.tableName = applicationName;
this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint;
this.dynamoDBEndpoint = dynamoDBEndpoint;
this.initialPositionInStream = initialPositionInStream;
this.kinesisCredentialsProvider = kinesisCredentialsProvider;
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
@ -459,6 +463,13 @@ public class KinesisClientLibConfiguration {
return kinesisEndpoint;
}
/**
* @return DynamoDB endpoint
*/
public String getDynamoDBEndpoint() {
return dynamoDBEndpoint;
}
/**
* @return the initialPositionInStream
*/
@ -599,6 +610,15 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* @param dynamoDBEndpoint DynamoDB endpoint
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withDynamoDBEndpoint(String dynamoDBEndpoint) {
this.dynamoDBEndpoint = dynamoDBEndpoint;
return this;
}
/**
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library will start
* fetching records from this position when the application starts up if there are no checkpoints. If there

View file

@ -240,6 +240,11 @@ public class Worker implements Runnable {
dynamoDBClient.setRegion(region);
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
}
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
if (config.getDynamoDBEndpoint() != null) {
dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint());
LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
}
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) {
kinesisClient.setEndpoint(config.getKinesisEndpoint());