DDB uses default AWS config settings to ping table; won't work with WithDyanmoClient. Misc update on example and README (#50)

This commit is contained in:
Prometheus 2018-06-01 16:14:42 -07:00 committed by Harlow Ward
parent 9e0a97916d
commit 992cc42419
6 changed files with 92 additions and 20 deletions

55
Gopkg.lock generated
View file

@ -3,15 +3,51 @@
[[projects]] [[projects]]
name = "github.com/apex/log" name = "github.com/apex/log"
packages = [".","handlers/text"] packages = [
".",
"handlers/text"
]
revision = "0296d6eb16bb28f8a0c55668affcf4876dc269be" revision = "0296d6eb16bb28f8a0c55668affcf4876dc269be"
version = "v1.0.0" version = "v1.0.0"
[[projects]] [[projects]]
name = "github.com/aws/aws-sdk-go" name = "github.com/aws/aws-sdk-go"
packages = ["aws","aws/awserr","aws/awsutil","aws/client","aws/client/metadata","aws/corehandlers","aws/credentials","aws/credentials/ec2rolecreds","aws/credentials/endpointcreds","aws/credentials/stscreds","aws/defaults","aws/ec2metadata","aws/endpoints","aws/request","aws/session","aws/signer/v4","internal/shareddefaults","private/protocol","private/protocol/json/jsonutil","private/protocol/jsonrpc","private/protocol/query","private/protocol/query/queryutil","private/protocol/rest","private/protocol/xml/xmlutil","service/dynamodb","service/dynamodb/dynamodbattribute","service/kinesis","service/sts"] packages = [
revision = "e4f7e38b704e3ed0acc4a7f8196b777696f6f1f3" "aws",
version = "v1.12.30" "aws/awserr",
"aws/awsutil",
"aws/client",
"aws/client/metadata",
"aws/corehandlers",
"aws/credentials",
"aws/credentials/ec2rolecreds",
"aws/credentials/endpointcreds",
"aws/credentials/stscreds",
"aws/defaults",
"aws/ec2metadata",
"aws/endpoints",
"aws/request",
"aws/session",
"aws/signer/v4",
"internal/sdkio",
"internal/sdkrand",
"internal/shareddefaults",
"private/protocol",
"private/protocol/json/jsonutil",
"private/protocol/jsonrpc",
"private/protocol/query",
"private/protocol/query/queryutil",
"private/protocol/rest",
"private/protocol/xml/xmlutil",
"service/dynamodb",
"service/dynamodb/dynamodbattribute",
"service/dynamodb/dynamodbiface",
"service/kinesis",
"service/kinesis/kinesisiface",
"service/sts"
]
revision = "827e7eac8c2680d5bdea7bc3ef29c596eabe1eae"
version = "v1.13.59"
[[projects]] [[projects]]
name = "github.com/go-ini/ini" name = "github.com/go-ini/ini"
@ -32,13 +68,20 @@
[[projects]] [[projects]]
name = "gopkg.in/redis.v5" name = "gopkg.in/redis.v5"
packages = [".","internal","internal/consistenthash","internal/hashtag","internal/pool","internal/proto"] packages = [
".",
"internal",
"internal/consistenthash",
"internal/hashtag",
"internal/pool",
"internal/proto"
]
revision = "a16aeec10ff407b1e7be6dd35797ccf5426ef0f0" revision = "a16aeec10ff407b1e7be6dd35797ccf5426ef0f0"
version = "v5.2.9" version = "v5.2.9"
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "acfe7035926da063cb81920dbb47c5e7dbcf5dbfbcf2e7dc5dd4ae0e4186490c" inputs-digest = "1b40486b645b81bc2215d0153631e9e002e534ba86713ba55500ce62c07cbad8"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View file

@ -91,6 +91,20 @@ ck, err := checkpoint.New(appName, tableName)
if err != nil { if err != nil {
log.Fatalf("new checkpoint error: %v", err) log.Fatalf("new checkpoint error: %v", err)
} }
// Override the Kinesis if any needs on session (e.g. assume role)
myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()))
// For versions of AWS sdk that fixed config being picked up properly, the example of
// setting region should work.
// myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()), &aws.Config{
// Region: aws.String("us-west-2"),
// })
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient))
if err != nil {
log.Fatalf("new checkpoint error: %v", err)
}
``` ```
To leverage the DDB checkpoint we'll also need to create a table: To leverage the DDB checkpoint we'll also need to create a table:

View file

@ -35,14 +35,6 @@ func WithDynamoClient(svc dynamodbiface.DynamoDBAPI) Option {
func New(appName, tableName string, opts ...Option) (*Checkpoint, error) { func New(appName, tableName string, opts ...Option) (*Checkpoint, error) {
client := dynamodb.New(session.New(aws.NewConfig())) client := dynamodb.New(session.New(aws.NewConfig()))
// ping table to verify it exists
_, err := client.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err != nil {
return nil, err
}
ck := &Checkpoint{ ck := &Checkpoint{
tableName: tableName, tableName: tableName,
appName: appName, appName: appName,
@ -135,6 +127,18 @@ func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
return nil return nil
} }
// ValidaCheckpoint validate the checkpoint table exits, shut down
func (c *Checkpoint) ValidateCheckpoint() error {
// ping table to verify it exists
_, err := c.client.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(c.tableName),
})
if err != nil {
c.done <- struct{}{}
}
return err
}
// Shutdown the checkpoint. Save any in-flight data. // Shutdown the checkpoint. Save any in-flight data.
func (c *Checkpoint) Shutdown() error { func (c *Checkpoint) Shutdown() error {
c.done <- struct{}{} c.done <- struct{}{}
@ -188,7 +192,7 @@ func (c *Checkpoint) save() error {
func retriableError(err error) bool { func retriableError(err error) bool {
if awsErr, ok := err.(awserr.Error); ok { if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ProvisionedThroughputExceededException" { if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException {
return true return true
} }
} }

View file

@ -14,4 +14,4 @@ export AWS_SECRET_KEY=
### Run the consumer ### Run the consumer
$ go run main.go --app appName --stream streamName $ go run main.go --app appName --stream streamName --table tableName

View file

@ -41,21 +41,30 @@ func main() {
flag.Parse() flag.Parse()
// Following will overwrite the default dynamodb client // Following will overwrite the default dynamodb client
myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig())) // Older versions of aws sdk does not picking up aws config properly.
// You probably need to update aws sdk verison. Tested the following with 1.13.59
myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()), &aws.Config{
Region: aws.String("us-west-2"),
})
// ddb checkpoint // ddb checkpoint
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient))
if err != nil { if err != nil {
log.Fatalf("checkpoint error: %v", err) log.Fatalf("checkpoint error: %v", err)
} }
err = ck.ValidateCheckpoint()
if err != nil {
log.Fatalf("checkpoint validation error: %v", err)
}
var ( var (
counter = expvar.NewMap("counters") counter = expvar.NewMap("counters")
logger = log.New(os.Stdout, "", log.LstdFlags) logger = log.New(os.Stdout, "", log.LstdFlags)
) )
// The following 2 lines will overwrite the default kinesis client // The following 2 lines will overwrite the default kinesis client
myKinesisClient := kinesis.New(session.New(aws.NewConfig())) myKinesisClient := kinesis.New(session.New(aws.NewConfig()), &aws.Config{
Region: aws.String("us-west-2"),
})
newKclient := consumer.NewKinesisClient(consumer.WithKinesis(myKinesisClient)) newKclient := consumer.NewKinesisClient(consumer.WithKinesis(myKinesisClient))
// consumer // consumer

View file

@ -14,7 +14,9 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
) )
var svc = kinesis.New(session.New()) var svc = kinesis.New(session.New(), &aws.Config{
Region: aws.String("us-west-2"),
})
func main() { func main() {
log.SetHandler(text.New(os.Stderr)) log.SetHandler(text.New(os.Stderr))