From 9e0a97916dd7ddca1a77d6c23f24cabab8f1e135 Mon Sep 17 00:00:00 2001 From: Prometheus Date: Thu, 31 May 2018 17:41:14 -0700 Subject: [PATCH] Use AWS resource iface, overwrite default dynamodb, more explicit in example about overwrite default AWS resrouce client (#49) * use custom kinesis client * use aws sdk interface, add missing api for ddb * add overwrite default dynamodbclien usage --- checkpoint/ddb/ddb.go | 10 +++++++++- client.go | 5 +++-- examples/consumer/main.go | 15 ++++++++++++++- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/checkpoint/ddb/ddb.go b/checkpoint/ddb/ddb.go index 74a5300..b48ec45 100644 --- a/checkpoint/ddb/ddb.go +++ b/checkpoint/ddb/ddb.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" ) // Option is used to override defaults when creating a new Checkpoint @@ -23,6 +24,13 @@ func WithMaxInterval(maxInterval time.Duration) Option { } } +// WithDynamoClient sets the dynamoDb client +func WithDynamoClient(svc dynamodbiface.DynamoDBAPI) Option { + return func(c *Checkpoint) { + c.client = svc + } +} + // New returns a checkpoint that uses DynamoDB for underlying storage func New(appName, tableName string, opts ...Option) (*Checkpoint, error) { client := dynamodb.New(session.New(aws.NewConfig())) @@ -58,7 +66,7 @@ func New(appName, tableName string, opts ...Option) (*Checkpoint, error) { type Checkpoint struct { tableName string appName string - client *dynamodb.DynamoDB + client dynamodbiface.DynamoDBAPI maxInterval time.Duration mu *sync.Mutex // protects the checkpoints checkpoints map[key]string diff --git a/client.go b/client.go index 7d752c9..f3eab7a 100644 --- a/client.go +++ b/client.go @@ -7,13 +7,14 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" ) // ClientOption is used to override defaults when creating a KinesisClient type ClientOption func(*KinesisClient) // WithKinesis overrides the default Kinesis client -func WithKinesis(svc *kinesis.Kinesis) ClientOption { +func WithKinesis(svc kinesisiface.KinesisAPI) ClientOption { return func(kc *KinesisClient) { kc.svc = svc } @@ -36,7 +37,7 @@ func NewKinesisClient(opts ...ClientOption) *KinesisClient { // KinesisClient acts as wrapper around Kinesis client type KinesisClient struct { - svc *kinesis.Kinesis + svc kinesisiface.KinesisAPI } // GetShardIDs returns shard ids in a given stream diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 3dc313c..4c7c670 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -11,6 +11,11 @@ import ( "os" "os/signal" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/kinesis" + consumer "github.com/harlow/kinesis-consumer" checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" ) @@ -35,8 +40,11 @@ func main() { ) flag.Parse() + // Following will overwrite the default dynamodb client + myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig())) + // ddb checkpoint - ck, err := checkpoint.New(*app, *table) + ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) if err != nil { log.Fatalf("checkpoint error: %v", err) } @@ -46,12 +54,17 @@ func main() { logger = log.New(os.Stdout, "", log.LstdFlags) ) + // The following 2 lines will overwrite the default kinesis client + myKinesisClient := kinesis.New(session.New(aws.NewConfig())) + newKclient := consumer.NewKinesisClient(consumer.WithKinesis(myKinesisClient)) + // consumer c, err := consumer.New( *stream, consumer.WithCheckpoint(ck), consumer.WithLogger(logger), consumer.WithCounter(counter), + consumer.WithClient(newKclient), ) if err != nil { log.Fatalf("consumer error: %v", err)