diff --git a/Gopkg.lock b/Gopkg.lock index ac39f22..523c43d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -3,15 +3,51 @@ [[projects]] name = "github.com/apex/log" - packages = [".","handlers/text"] + packages = [ + ".", + "handlers/text" + ] revision = "0296d6eb16bb28f8a0c55668affcf4876dc269be" version = "v1.0.0" [[projects]] name = "github.com/aws/aws-sdk-go" - packages = ["aws","aws/awserr","aws/awsutil","aws/client","aws/client/metadata","aws/corehandlers","aws/credentials","aws/credentials/ec2rolecreds","aws/credentials/endpointcreds","aws/credentials/stscreds","aws/defaults","aws/ec2metadata","aws/endpoints","aws/request","aws/session","aws/signer/v4","internal/shareddefaults","private/protocol","private/protocol/json/jsonutil","private/protocol/jsonrpc","private/protocol/query","private/protocol/query/queryutil","private/protocol/rest","private/protocol/xml/xmlutil","service/dynamodb","service/dynamodb/dynamodbattribute","service/kinesis","service/sts"] - revision = "e4f7e38b704e3ed0acc4a7f8196b777696f6f1f3" - version = "v1.12.30" + packages = [ + "aws", + "aws/awserr", + "aws/awsutil", + "aws/client", + "aws/client/metadata", + "aws/corehandlers", + "aws/credentials", + "aws/credentials/ec2rolecreds", + "aws/credentials/endpointcreds", + "aws/credentials/stscreds", + "aws/defaults", + "aws/ec2metadata", + "aws/endpoints", + "aws/request", + "aws/session", + "aws/signer/v4", + "internal/sdkio", + "internal/sdkrand", + "internal/shareddefaults", + "private/protocol", + "private/protocol/json/jsonutil", + "private/protocol/jsonrpc", + "private/protocol/query", + "private/protocol/query/queryutil", + "private/protocol/rest", + "private/protocol/xml/xmlutil", + "service/dynamodb", + "service/dynamodb/dynamodbattribute", + "service/dynamodb/dynamodbiface", + "service/kinesis", + "service/kinesis/kinesisiface", + "service/sts" + ] + revision = "827e7eac8c2680d5bdea7bc3ef29c596eabe1eae" + version = "v1.13.59" [[projects]] name = "github.com/go-ini/ini" @@ -32,13 +68,20 @@ [[projects]] name = "gopkg.in/redis.v5" - packages = [".","internal","internal/consistenthash","internal/hashtag","internal/pool","internal/proto"] + packages = [ + ".", + "internal", + "internal/consistenthash", + "internal/hashtag", + "internal/pool", + "internal/proto" + ] revision = "a16aeec10ff407b1e7be6dd35797ccf5426ef0f0" version = "v5.2.9" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "acfe7035926da063cb81920dbb47c5e7dbcf5dbfbcf2e7dc5dd4ae0e4186490c" + inputs-digest = "1b40486b645b81bc2215d0153631e9e002e534ba86713ba55500ce62c07cbad8" solver-name = "gps-cdcl" solver-version = 1 diff --git a/README.md b/README.md index ce599b2..2621496 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,20 @@ ck, err := checkpoint.New(appName, tableName) if err != nil { log.Fatalf("new checkpoint error: %v", err) } + +// Override the Kinesis if any needs on session (e.g. assume role) +myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig())) + +// For versions of AWS sdk that fixed config being picked up properly, the example of +// setting region should work. +// myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()), &aws.Config{ +// Region: aws.String("us-west-2"), +// }) + +ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) +if err != nil { + log.Fatalf("new checkpoint error: %v", err) +} ``` To leverage the DDB checkpoint we'll also need to create a table: diff --git a/checkpoint/ddb/ddb.go b/checkpoint/ddb/ddb.go index b48ec45..20fbc33 100644 --- a/checkpoint/ddb/ddb.go +++ b/checkpoint/ddb/ddb.go @@ -35,14 +35,6 @@ func WithDynamoClient(svc dynamodbiface.DynamoDBAPI) Option { func New(appName, tableName string, opts ...Option) (*Checkpoint, error) { client := dynamodb.New(session.New(aws.NewConfig())) - // ping table to verify it exists - _, err := client.DescribeTable(&dynamodb.DescribeTableInput{ - TableName: aws.String(tableName), - }) - if err != nil { - return nil, err - } - ck := &Checkpoint{ tableName: tableName, appName: appName, @@ -135,6 +127,18 @@ func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { return nil } +// ValidaCheckpoint validate the checkpoint table exits, shut down +func (c *Checkpoint) ValidateCheckpoint() error { + // ping table to verify it exists + _, err := c.client.DescribeTable(&dynamodb.DescribeTableInput{ + TableName: aws.String(c.tableName), + }) + if err != nil { + c.done <- struct{}{} + } + return err +} + // Shutdown the checkpoint. Save any in-flight data. func (c *Checkpoint) Shutdown() error { c.done <- struct{}{} @@ -188,7 +192,7 @@ func (c *Checkpoint) save() error { func retriableError(err error) bool { if awsErr, ok := err.(awserr.Error); ok { - if awsErr.Code() == "ProvisionedThroughputExceededException" { + if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException { return true } } diff --git a/examples/consumer/README.md b/examples/consumer/README.md index 09d3b61..0337371 100644 --- a/examples/consumer/README.md +++ b/examples/consumer/README.md @@ -14,4 +14,4 @@ export AWS_SECRET_KEY= ### Run the consumer - $ go run main.go --app appName --stream streamName + $ go run main.go --app appName --stream streamName --table tableName diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 4c7c670..4343378 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -41,21 +41,30 @@ func main() { flag.Parse() // Following will overwrite the default dynamodb client - myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig())) + // Older versions of aws sdk does not picking up aws config properly. + // You probably need to update aws sdk verison. Tested the following with 1.13.59 + myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()), &aws.Config{ + Region: aws.String("us-west-2"), + }) // ddb checkpoint ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) if err != nil { log.Fatalf("checkpoint error: %v", err) } - + err = ck.ValidateCheckpoint() + if err != nil { + log.Fatalf("checkpoint validation error: %v", err) + } var ( counter = expvar.NewMap("counters") logger = log.New(os.Stdout, "", log.LstdFlags) ) // The following 2 lines will overwrite the default kinesis client - myKinesisClient := kinesis.New(session.New(aws.NewConfig())) + myKinesisClient := kinesis.New(session.New(aws.NewConfig()), &aws.Config{ + Region: aws.String("us-west-2"), + }) newKclient := consumer.NewKinesisClient(consumer.WithKinesis(myKinesisClient)) // consumer diff --git a/examples/producer/main.go b/examples/producer/main.go index 08b14ae..6a9982a 100644 --- a/examples/producer/main.go +++ b/examples/producer/main.go @@ -14,7 +14,9 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" ) -var svc = kinesis.New(session.New()) +var svc = kinesis.New(session.New(), &aws.Config{ + Region: aws.String("us-west-2"), +}) func main() { log.SetHandler(text.New(os.Stderr))