Add support for configuring DynamoDB endpoint

Adding a new field named `dynamoDBEndpoint` to the .properties file
that gets passed into the KCL multi-lang daemon. We need this ability
to point the KCL worker at a local instance of DynamoDB rather than in
AWS.
This commit is contained in:
Michael Choi 2015-10-26 15:05:48 -07:00 committed by Kerrick Staley
parent 74c259ca11
commit 0f5ed87522
3 changed files with 27 additions and 1 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
target/ target/
AwsCredentials.properties AwsCredentials.properties
*.swp

View file

@ -157,6 +157,7 @@ public class KinesisClientLibConfiguration {
private String applicationName; private String applicationName;
private String streamName; private String streamName;
private String kinesisEndpoint; private String kinesisEndpoint;
private String dynamoDBEndpoint;
private InitialPositionInStream initialPositionInStream; private InitialPositionInStream initialPositionInStream;
private AWSCredentialsProvider kinesisCredentialsProvider; private AWSCredentialsProvider kinesisCredentialsProvider;
private AWSCredentialsProvider dynamoDBCredentialsProvider; private AWSCredentialsProvider dynamoDBCredentialsProvider;
@ -220,7 +221,7 @@ public class KinesisClientLibConfiguration {
AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) { String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
@ -236,6 +237,7 @@ public class KinesisClientLibConfiguration {
* can assist with troubleshooting (e.g. distinguish requests made by separate applications). * can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream * @param streamName Name of the Kinesis stream
* @param kinesisEndpoint Kinesis endpoint * @param kinesisEndpoint Kinesis endpoint
* @param dynamoDBEndpoint DynamoDB endpoint
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
* records from that location in the stream when an application starts up for the first time and there * records from that location in the stream when an application starts up for the first time and there
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position. * are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
@ -269,6 +271,7 @@ public class KinesisClientLibConfiguration {
public KinesisClientLibConfiguration(String applicationName, public KinesisClientLibConfiguration(String applicationName,
String streamName, String streamName,
String kinesisEndpoint, String kinesisEndpoint,
String dynamoDBEndpoint,
InitialPositionInStream initialPositionInStream, InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider, AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider dynamoDBCredentialsProvider,
@ -302,6 +305,7 @@ public class KinesisClientLibConfiguration {
this.applicationName = applicationName; this.applicationName = applicationName;
this.streamName = streamName; this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint; this.kinesisEndpoint = kinesisEndpoint;
this.dynamoDBEndpoint = dynamoDBEndpoint;
this.initialPositionInStream = initialPositionInStream; this.initialPositionInStream = initialPositionInStream;
this.kinesisCredentialsProvider = kinesisCredentialsProvider; this.kinesisCredentialsProvider = kinesisCredentialsProvider;
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
@ -450,6 +454,13 @@ public class KinesisClientLibConfiguration {
return kinesisEndpoint; return kinesisEndpoint;
} }
/**
* @return DynamoDB endpoint
*/
public String getDynamoDBEndpoint() {
return dynamoDBEndpoint;
}
/** /**
* @return the initialPositionInStream * @return the initialPositionInStream
*/ */
@ -581,6 +592,15 @@ public class KinesisClientLibConfiguration {
return this; return this;
} }
/**
* @param dynamoDBEndpoint DynamoDB endpoint
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withDynamoDBEndpoint(String dynamoDBEndpoint) {
this.dynamoDBEndpoint = dynamoDBEndpoint;
return this;
}
/** /**
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library will start * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library will start
* fetching records from this position when the application starts up if there are no checkpoints. If there * fetching records from this position when the application starts up if there are no checkpoints. If there

View file

@ -240,6 +240,11 @@ public class Worker implements Runnable {
dynamoDBClient.setRegion(region); dynamoDBClient.setRegion(region);
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
} }
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
if (config.getDynamoDBEndpoint() != null) {
dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint());
LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
}
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) { if (config.getKinesisEndpoint() != null) {
kinesisClient.setEndpoint(config.getKinesisEndpoint()); kinesisClient.setEndpoint(config.getKinesisEndpoint());