From cd343cca0962c84725214088a19964b51e59e7a6 Mon Sep 17 00:00:00 2001 From: Tim Studd Date: Sat, 9 Feb 2019 08:23:54 -0800 Subject: [PATCH] Add configuration options for AWS service endpoints (#5) * Add configuration options for AWS service endpoints Signed-off-by: Timothy Studd * Fix KCL naming consistency issue Signed-off-by: Timothy Studd --- clientlibrary/config/config.go | 8 ++++++++ clientlibrary/config/kcl-config.go | 15 ++++++++++++++- clientlibrary/worker/checkpointer.go | 6 +++--- clientlibrary/worker/worker.go | 19 +++++++++++++------ 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 0e3926a..f1f3090 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -158,6 +158,14 @@ type ( // ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream. ApplicationName string + // DynamoDBEndpoint is an optional endpoint URL that overrides the default generated endpoint for a DynamoDB client. + // If this is empty, the default generated endpoint will be used. + DynamoDBEndpoint string + + // KinesisEndpoint is an optional endpoint URL that overrides the default generated endpoint for a Kinesis client. + // If this is empty, the default generated endpoint will be used. + KinesisEndpoint string + // TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName TableName string diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 6c78a2a..4d208cf 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -34,8 +34,9 @@ package config import ( - "github.com/vmware/vmware-go-kcl/clientlibrary/utils" "time" + + "github.com/vmware/vmware-go-kcl/clientlibrary/utils" ) // NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. @@ -77,6 +78,18 @@ func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID } } +// WithKinesisEndpoint is used to provide an alternative Kinesis endpoint +func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration { + c.KinesisEndpoint = kinesisEndpoint + return c +} + +// WithDynamoDBEndpoint is used to provide an alternative DynamoDB endpoint +func (c *KinesisClientLibConfiguration) WithDynamoDBEndpoint(dynamoDBEndpoint string) *KinesisClientLibConfiguration { + c.DynamoDBEndpoint = dynamoDBEndpoint + return c +} + // WithTableName to provide alternative lease table in DynamoDB func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration { c.TableName = tableName diff --git a/clientlibrary/worker/checkpointer.go b/clientlibrary/worker/checkpointer.go index 1361f78..9d15fa1 100644 --- a/clientlibrary/worker/checkpointer.go +++ b/clientlibrary/worker/checkpointer.go @@ -286,9 +286,9 @@ func (checkpointer *DynamoCheckpoint) saveItem(item map[string]*dynamodb.Attribu func (checkpointer *DynamoCheckpoint) conditionalUpdate(conditionExpression string, expressionAttributeValues map[string]*dynamodb.AttributeValue, item map[string]*dynamodb.AttributeValue) error { return checkpointer.putItem(&dynamodb.PutItemInput{ - ConditionExpression: aws.String(conditionExpression), - TableName: aws.String(checkpointer.TableName), - Item: item, + ConditionExpression: aws.String(conditionExpression), + TableName: aws.String(checkpointer.TableName), + Item: item, ExpressionAttributeValues: expressionAttributeValues, }) } diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index e8a8a12..c295f86 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -29,13 +29,14 @@ package worker import ( "errors" - log "github.com/sirupsen/logrus" "os" "os/signal" "sync" "syscall" "time" + log "github.com/sirupsen/logrus" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" @@ -112,11 +113,17 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli // create session for Kinesis log.Info("Creating Kinesis session") - s := session.New(&aws.Config{Region: aws.String(w.regionName)}) + s := session.New(&aws.Config{ + Region: aws.String(w.regionName), + Endpoint: &kclConfig.KinesisEndpoint, + }) w.kc = kinesis.New(s) log.Info("Creating DynamoDB session") - s = session.New(&aws.Config{Region: aws.String(w.regionName)}) + s = session.New(&aws.Config{ + Region: aws.String(w.regionName), + Endpoint: &kclConfig.DynamoDBEndpoint, + }) w.dynamo = dynamodb.New(s) w.checkpointer = NewDynamoCheckpoint(w.dynamo, kclConfig) @@ -329,9 +336,9 @@ func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) err if _, ok := w.shardStatus[*s.ShardId]; !ok { log.Infof("Found new shard with id %s", *s.ShardId) w.shardStatus[*s.ShardId] = &shardStatus{ - ID: *s.ShardId, - ParentShardId: aws.StringValue(s.ParentShardId), - mux: &sync.Mutex{}, + ID: *s.ShardId, + ParentShardId: aws.StringValue(s.ParentShardId), + mux: &sync.Mutex{}, StartingSequenceNumber: aws.StringValue(s.SequenceNumberRange.StartingSequenceNumber), EndingSequenceNumber: aws.StringValue(s.SequenceNumberRange.EndingSequenceNumber), }