From 0f5ed87522eb7f4dade2c14ecf94379cd1efbd4d Mon Sep 17 00:00:00 2001 From: Michael Choi Date: Mon, 26 Oct 2015 15:05:48 -0700 Subject: [PATCH] Add support for configuring DynamoDB endpoint Adding a new field named `dynamoDBEndpoint` to the .properties file that gets passed into the KCL multi-lang daemon. We need this ability to point the KCL worker at a local instance of DynamoDB rather than in AWS. --- .gitignore | 1 + .../worker/KinesisClientLibConfiguration.java | 22 ++++++++++++++++++- .../clientlibrary/lib/worker/Worker.java | 5 +++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ffa4e664..f5d2741c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target/ AwsCredentials.properties +*.swp diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 510565f5..5ea97dfb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -157,6 +157,7 @@ public class KinesisClientLibConfiguration { private String applicationName; private String streamName; private String kinesisEndpoint; + private String dynamoDBEndpoint; private InitialPositionInStream initialPositionInStream; private AWSCredentialsProvider kinesisCredentialsProvider; private AWSCredentialsProvider dynamoDBCredentialsProvider; @@ -220,7 +221,7 @@ public class KinesisClientLibConfiguration { AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider, String workerId) { - this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, + this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, @@ -236,6 +237,7 @@ public class KinesisClientLibConfiguration { * can assist with troubleshooting (e.g. distinguish requests made by separate applications). * @param streamName Name of the Kinesis stream * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching * records from that location in the stream when an application starts up for the first time and there * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. @@ -269,6 +271,7 @@ public class KinesisClientLibConfiguration { public KinesisClientLibConfiguration(String applicationName, String streamName, String kinesisEndpoint, + String dynamoDBEndpoint, InitialPositionInStream initialPositionInStream, AWSCredentialsProvider kinesisCredentialsProvider, AWSCredentialsProvider dynamoDBCredentialsProvider, @@ -302,6 +305,7 @@ public class KinesisClientLibConfiguration { this.applicationName = applicationName; this.streamName = streamName; this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; this.initialPositionInStream = initialPositionInStream; this.kinesisCredentialsProvider = kinesisCredentialsProvider; this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; @@ -450,6 +454,13 @@ public class KinesisClientLibConfiguration { return kinesisEndpoint; } + /** + * @return DynamoDB endpoint + */ + public String getDynamoDBEndpoint() { + return dynamoDBEndpoint; + } + /** * @return the initialPositionInStream */ @@ -581,6 +592,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param dynamoDBEndpoint DynamoDB endpoint + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withDynamoDBEndpoint(String dynamoDBEndpoint) { + this.dynamoDBEndpoint = dynamoDBEndpoint; + return this; + } + /** * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library will start * fetching records from this position when the application starts up if there are no checkpoints. If there diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index dbbf934d..3fd8f93c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -240,6 +240,11 @@ public class Worker implements Runnable { dynamoDBClient.setRegion(region); LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); } + // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. + if (config.getDynamoDBEndpoint() != null) { + dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); + LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint()); + } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { kinesisClient.setEndpoint(config.getKinesisEndpoint());