Add configuration options for AWS service endpoints (#5)

* Add configuration options for AWS service endpoints

Signed-off-by: Timothy Studd <tim@goguardian.com>

* Fix KCL naming consistency issue

Signed-off-by: Timothy Studd <tim@goguardian.com>
This commit is contained in:
Tim Studd 2019-02-09 08:23:54 -08:00 committed by Tao Jiang
parent 03685b2b19
commit cd343cca09
4 changed files with 38 additions and 10 deletions

View file

@ -158,6 +158,14 @@ type (
// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
ApplicationName string
// DynamoDBEndpoint is an optional endpoint URL that overrides the default generated endpoint for a DynamoDB client.
// If this is empty, the default generated endpoint will be used.
DynamoDBEndpoint string
// KinesisEndpoint is an optional endpoint URL that overrides the default generated endpoint for a Kinesis client.
// If this is empty, the default generated endpoint will be used.
KinesisEndpoint string
// TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName
TableName string

View file

@ -34,8 +34,9 @@
package config
import (
"github.com/vmware/vmware-go-kcl/clientlibrary/utils"
"time"
"github.com/vmware/vmware-go-kcl/clientlibrary/utils"
)
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
@ -77,6 +78,18 @@ func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID
}
}
// WithKinesisEndpoint is used to provide an alternative Kinesis endpoint
func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration {
c.KinesisEndpoint = kinesisEndpoint
return c
}
// WithDynamoDBEndpoint is used to provide an alternative DynamoDB endpoint
func (c *KinesisClientLibConfiguration) WithDynamoDBEndpoint(dynamoDBEndpoint string) *KinesisClientLibConfiguration {
c.DynamoDBEndpoint = dynamoDBEndpoint
return c
}
// WithTableName to provide alternative lease table in DynamoDB
func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration {
c.TableName = tableName

View file

@ -286,9 +286,9 @@ func (checkpointer *DynamoCheckpoint) saveItem(item map[string]*dynamodb.Attribu
func (checkpointer *DynamoCheckpoint) conditionalUpdate(conditionExpression string, expressionAttributeValues map[string]*dynamodb.AttributeValue, item map[string]*dynamodb.AttributeValue) error {
return checkpointer.putItem(&dynamodb.PutItemInput{
ConditionExpression: aws.String(conditionExpression),
TableName: aws.String(checkpointer.TableName),
Item: item,
ConditionExpression: aws.String(conditionExpression),
TableName: aws.String(checkpointer.TableName),
Item: item,
ExpressionAttributeValues: expressionAttributeValues,
})
}

View file

@ -29,13 +29,14 @@ package worker
import (
"errors"
log "github.com/sirupsen/logrus"
"os"
"os/signal"
"sync"
"syscall"
"time"
log "github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
@ -112,11 +113,17 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli
// create session for Kinesis
log.Info("Creating Kinesis session")
s := session.New(&aws.Config{Region: aws.String(w.regionName)})
s := session.New(&aws.Config{
Region: aws.String(w.regionName),
Endpoint: &kclConfig.KinesisEndpoint,
})
w.kc = kinesis.New(s)
log.Info("Creating DynamoDB session")
s = session.New(&aws.Config{Region: aws.String(w.regionName)})
s = session.New(&aws.Config{
Region: aws.String(w.regionName),
Endpoint: &kclConfig.DynamoDBEndpoint,
})
w.dynamo = dynamodb.New(s)
w.checkpointer = NewDynamoCheckpoint(w.dynamo, kclConfig)
@ -329,9 +336,9 @@ func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) err
if _, ok := w.shardStatus[*s.ShardId]; !ok {
log.Infof("Found new shard with id %s", *s.ShardId)
w.shardStatus[*s.ShardId] = &shardStatus{
ID: *s.ShardId,
ParentShardId: aws.StringValue(s.ParentShardId),
mux: &sync.Mutex{},
ID: *s.ShardId,
ParentShardId: aws.StringValue(s.ParentShardId),
mux: &sync.Mutex{},
StartingSequenceNumber: aws.StringValue(s.SequenceNumberRange.StartingSequenceNumber),
EndingSequenceNumber: aws.StringValue(s.SequenceNumberRange.EndingSequenceNumber),
}