diff --git a/examples/consumer-dynamodb/main.go b/examples/consumer-dynamodb/main.go index 3ee8be1..b3edb66 100644 --- a/examples/consumer-dynamodb/main.go +++ b/examples/consumer-dynamodb/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "expvar" "flag" "fmt" @@ -15,7 +16,6 @@ import ( alog "github.com/apex/log" "github.com/apex/log/handlers/text" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/dynamodb" ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" @@ -34,7 +34,7 @@ func init() { } go func() { fmt.Println("Metrics available at http://localhost:8080/debug/vars") - http.Serve(sock, nil) + _ = http.Serve(sock, nil) }() } @@ -50,7 +50,7 @@ func (l *myLogger) Log(args ...interface{}) { func main() { // Wrap myLogger around apex logger - mylog := &myLogger{ + myLog := &myLogger{ logger: alog.Logger{ Handler: text.New(os.Stdout), Level: alog.DebugLevel, @@ -67,25 +67,27 @@ func main() { ) flag.Parse() - // set up clients - kcfg, err := newConfig(*kinesisEndpoint, *awsRegion) - if err != nil { - log.Fatalf("new kinesis config error: %v", err) - } - var myKsis = kinesis.NewFromConfig(kcfg) + // kinesis + var client = kinesis.New( + kinesis.Options{ + BaseEndpoint: kinesisEndpoint, + Region: *awsRegion, + Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), + }) - dcfg, err := newConfig(*ddbEndpoint, *awsRegion) - if err != nil { - log.Fatalf("new ddb config error: %v", err) - } - var myDdbClient = dynamodb.NewFromConfig(dcfg) + // dynamoDB + var myDdbClient = dynamodb.New(dynamodb.Options{ + BaseEndpoint: ddbEndpoint, + Region: *awsRegion, + Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), + }) // ddb checkpoint table if err := createTable(myDdbClient, *tableName); err != nil { log.Fatalf("create ddb table error: %v", err) } - // ddb persitance + // ddb persistence ddb, err := storage.New(*app, *tableName, storage.WithDynamoClient(myDdbClient), storage.WithRetryer(&MyRetryer{})) if err != nil { log.Fatalf("checkpoint error: %v", err) @@ -98,9 +100,9 @@ func main() { c, err := consumer.New( *stream, consumer.WithStore(ddb), - consumer.WithLogger(mylog), + consumer.WithLogger(myLog), consumer.WithCounter(counter), - consumer.WithClient(myKsis), + consumer.WithClient(client), ) if err != nil { log.Fatalf("consumer error: %v", err) @@ -183,26 +185,11 @@ type MyRetryer struct { // ShouldRetry implements custom logic for when errors should retry func (r *MyRetryer) ShouldRetry(err error) bool { - switch err.(type) { - case *types.ProvisionedThroughputExceededException, *types.LimitExceededException: + var provisionedThroughputExceededException *types.ProvisionedThroughputExceededException + var limitExceededException *types.LimitExceededException + switch { + case errors.As(err, &provisionedThroughputExceededException), errors.As(err, &limitExceededException): return true } return false } - -func newConfig(url, region string) (aws.Config, error) { - resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { - return aws.Endpoint{ - PartitionID: "aws", - URL: url, - SigningRegion: region, - }, nil - }) - - return config.LoadDefaultConfig( - context.TODO(), - config.WithRegion(region), - config.WithEndpointResolver(resolver), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), - ) -} diff --git a/examples/consumer-mysql/main.go b/examples/consumer-mysql/main.go index c0c9c97..8568457 100644 --- a/examples/consumer-mysql/main.go +++ b/examples/consumer-mysql/main.go @@ -9,8 +9,6 @@ import ( "os" "os/signal" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" @@ -30,38 +28,26 @@ func main() { flag.Parse() // mysql checkpoint - store, err := store.New(*app, *table, *connStr) + mysqlStore, err := store.New(*app, *table, *connStr) if err != nil { log.Fatalf("checkpoint error: %v", err) } var counter = expvar.NewMap("counters") - resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { - return aws.Endpoint{ - PartitionID: "aws", - URL: *kinesisEndpoint, - SigningRegion: *awsRegion, - }, nil - }) - // client - cfg, err := config.LoadDefaultConfig( - context.TODO(), - config.WithRegion(*awsRegion), - config.WithEndpointResolver(resolver), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), - ) - if err != nil { - log.Fatalf("unable to load SDK config, %v", err) - } - var client = kinesis.NewFromConfig(cfg) + var client = kinesis.New( + kinesis.Options{ + BaseEndpoint: kinesisEndpoint, + Region: *awsRegion, + Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), + }) // consumer c, err := consumer.New( *stream, consumer.WithClient(client), - consumer.WithStore(store), + consumer.WithStore(mysqlStore), consumer.WithCounter(counter), ) if err != nil { @@ -90,7 +76,7 @@ func main() { log.Fatalf("scan error: %v", err) } - if err := store.Shutdown(); err != nil { - log.Fatalf("store shutdown error: %v", err) + if err := mysqlStore.Shutdown(); err != nil { + log.Fatalf("mysqlStore shutdown error: %v", err) } } diff --git a/examples/consumer-postgres/main.go b/examples/consumer-postgres/main.go index 2fee9c1..0afb9b2 100644 --- a/examples/consumer-postgres/main.go +++ b/examples/consumer-postgres/main.go @@ -9,8 +9,6 @@ import ( "os" "os/signal" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" @@ -30,38 +28,26 @@ func main() { flag.Parse() // postgres checkpoint - store, err := store.New(*app, *table, *connStr) + checkpointStore, err := store.New(*app, *table, *connStr) if err != nil { log.Fatalf("checkpoint error: %v", err) } var counter = expvar.NewMap("counters") - resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { - return aws.Endpoint{ - PartitionID: "aws", - URL: *kinesisEndpoint, - SigningRegion: *awsRegion, - }, nil - }) - // client - cfg, err := config.LoadDefaultConfig( - context.TODO(), - config.WithRegion(*awsRegion), - config.WithEndpointResolver(resolver), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), - ) - if err != nil { - log.Fatalf("unable to load SDK config, %v", err) - } - var client = kinesis.NewFromConfig(cfg) + var client = kinesis.New( + kinesis.Options{ + BaseEndpoint: kinesisEndpoint, + Region: *awsRegion, + Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), + }) // consumer c, err := consumer.New( *stream, consumer.WithClient(client), - consumer.WithStore(store), + consumer.WithStore(checkpointStore), consumer.WithCounter(counter), ) if err != nil { @@ -90,7 +76,7 @@ func main() { log.Fatalf("scan error: %v", err) } - if err := store.Shutdown(); err != nil { + if err := checkpointStore.Shutdown(); err != nil { log.Fatalf("store shutdown error: %v", err) } } diff --git a/examples/consumer-redis/main.go b/examples/consumer-redis/main.go index b5b8457..e74724e 100644 --- a/examples/consumer-redis/main.go +++ b/examples/consumer-redis/main.go @@ -8,8 +8,6 @@ import ( "os" "os/signal" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" @@ -36,10 +34,10 @@ func main() { ) flag.Parse() - // redis checkpoint store - store, err := store.New(*app) + // redis checkpoint checkpointStore + checkpointStore, err := store.New(*app) if err != nil { - log.Fatalf("store error: %v", err) + log.Fatalf("checkpointStore error: %v", err) } // logger @@ -47,31 +45,19 @@ func main() { logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags), } - resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { - return aws.Endpoint{ - PartitionID: "aws", - URL: *kinesisEndpoint, - SigningRegion: *awsRegion, - }, nil - }) - // client - cfg, err := config.LoadDefaultConfig( - context.TODO(), - config.WithRegion(*awsRegion), - config.WithEndpointResolver(resolver), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), - ) - if err != nil { - log.Fatalf("unable to load SDK config, %v", err) - } - var client = kinesis.NewFromConfig(cfg) + var client = kinesis.New( + kinesis.Options{ + BaseEndpoint: kinesisEndpoint, + Region: *awsRegion, + Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), + }) // consumer c, err := consumer.New( *stream, consumer.WithClient(client), - consumer.WithStore(store), + consumer.WithStore(checkpointStore), consumer.WithLogger(logger), ) if err != nil { diff --git a/examples/consumer/main.go b/examples/consumer/main.go index e5ea1e5..9d886bb 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -9,8 +9,6 @@ import ( "os/signal" "syscall" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" @@ -35,25 +33,13 @@ func main() { ) flag.Parse() - resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { - return aws.Endpoint{ - PartitionID: "aws", - URL: *kinesisEndpoint, - SigningRegion: *awsRegion, - }, nil - }) - // client - cfg, err := config.LoadDefaultConfig( - context.TODO(), - config.WithRegion(*awsRegion), - config.WithEndpointResolver(resolver), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), - ) - if err != nil { - log.Fatalf("unable to load SDK config, %v", err) - } - var client = kinesis.NewFromConfig(cfg) + var client = kinesis.New( + kinesis.Options{ + BaseEndpoint: kinesisEndpoint, + Region: *awsRegion, + Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), + }) // consumer c, err := consumer.New( diff --git a/examples/producer/main.go b/examples/producer/main.go index e714973..173e0a7 100644 --- a/examples/producer/main.go +++ b/examples/producer/main.go @@ -10,7 +10,6 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" @@ -26,25 +25,12 @@ func main() { var records []types.PutRecordsRequestEntry - resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { - return aws.Endpoint{ - PartitionID: "aws", - URL: *kinesisEndpoint, - SigningRegion: *awsRegion, - }, nil + var client = kinesis.New(kinesis.Options{ + BaseEndpoint: kinesisEndpoint, + Region: *awsRegion, + Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), }) - cfg, err := config.LoadDefaultConfig( - context.TODO(), - config.WithRegion(*awsRegion), - config.WithEndpointResolver(resolver), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), - ) - if err != nil { - log.Fatalf("unable to load SDK config, %v", err) - } - var client = kinesis.NewFromConfig(cfg) - // create stream if doesn't exist if err := createStream(client, *streamName); err != nil { log.Fatalf("create stream error: %v", err)