Merge pull request #17 from alexgridx/16-update-examples

#16 updates examples
This commit is contained in:
Alex 2024-04-12 10:02:14 +02:00 committed by GitHub
commit 36e7fc8b89
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 62 additions and 145 deletions

View file

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"errors"
"expvar" "expvar"
"flag" "flag"
"fmt" "fmt"
@ -15,7 +16,6 @@ import (
alog "github.com/apex/log" alog "github.com/apex/log"
"github.com/apex/log/handlers/text" "github.com/apex/log/handlers/text"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb"
ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
@ -34,7 +34,7 @@ func init() {
} }
go func() { go func() {
fmt.Println("Metrics available at http://localhost:8080/debug/vars") fmt.Println("Metrics available at http://localhost:8080/debug/vars")
http.Serve(sock, nil) _ = http.Serve(sock, nil)
}() }()
} }
@ -50,7 +50,7 @@ func (l *myLogger) Log(args ...interface{}) {
func main() { func main() {
// Wrap myLogger around apex logger // Wrap myLogger around apex logger
mylog := &myLogger{ myLog := &myLogger{
logger: alog.Logger{ logger: alog.Logger{
Handler: text.New(os.Stdout), Handler: text.New(os.Stdout),
Level: alog.DebugLevel, Level: alog.DebugLevel,
@ -67,25 +67,27 @@ func main() {
) )
flag.Parse() flag.Parse()
// set up clients // kinesis
kcfg, err := newConfig(*kinesisEndpoint, *awsRegion) var client = kinesis.New(
if err != nil { kinesis.Options{
log.Fatalf("new kinesis config error: %v", err) BaseEndpoint: kinesisEndpoint,
} Region: *awsRegion,
var myKsis = kinesis.NewFromConfig(kcfg) Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
})
dcfg, err := newConfig(*ddbEndpoint, *awsRegion) // dynamoDB
if err != nil { var myDdbClient = dynamodb.New(dynamodb.Options{
log.Fatalf("new ddb config error: %v", err) BaseEndpoint: ddbEndpoint,
} Region: *awsRegion,
var myDdbClient = dynamodb.NewFromConfig(dcfg) Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
})
// ddb checkpoint table // ddb checkpoint table
if err := createTable(myDdbClient, *tableName); err != nil { if err := createTable(myDdbClient, *tableName); err != nil {
log.Fatalf("create ddb table error: %v", err) log.Fatalf("create ddb table error: %v", err)
} }
// ddb persitance // ddb persistence
ddb, err := storage.New(*app, *tableName, storage.WithDynamoClient(myDdbClient), storage.WithRetryer(&MyRetryer{})) ddb, err := storage.New(*app, *tableName, storage.WithDynamoClient(myDdbClient), storage.WithRetryer(&MyRetryer{}))
if err != nil { if err != nil {
log.Fatalf("checkpoint error: %v", err) log.Fatalf("checkpoint error: %v", err)
@ -98,9 +100,9 @@ func main() {
c, err := consumer.New( c, err := consumer.New(
*stream, *stream,
consumer.WithStore(ddb), consumer.WithStore(ddb),
consumer.WithLogger(mylog), consumer.WithLogger(myLog),
consumer.WithCounter(counter), consumer.WithCounter(counter),
consumer.WithClient(myKsis), consumer.WithClient(client),
) )
if err != nil { if err != nil {
log.Fatalf("consumer error: %v", err) log.Fatalf("consumer error: %v", err)
@ -183,26 +185,11 @@ type MyRetryer struct {
// ShouldRetry implements custom logic for when errors should retry // ShouldRetry implements custom logic for when errors should retry
func (r *MyRetryer) ShouldRetry(err error) bool { func (r *MyRetryer) ShouldRetry(err error) bool {
switch err.(type) { var provisionedThroughputExceededException *types.ProvisionedThroughputExceededException
case *types.ProvisionedThroughputExceededException, *types.LimitExceededException: var limitExceededException *types.LimitExceededException
switch {
case errors.As(err, &provisionedThroughputExceededException), errors.As(err, &limitExceededException):
return true return true
} }
return false return false
} }
func newConfig(url, region string) (aws.Config, error) {
resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
return aws.Endpoint{
PartitionID: "aws",
URL: url,
SigningRegion: region,
}, nil
})
return config.LoadDefaultConfig(
context.TODO(),
config.WithRegion(region),
config.WithEndpointResolver(resolver),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")),
)
}

View file

@ -9,8 +9,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis"
@ -30,38 +28,26 @@ func main() {
flag.Parse() flag.Parse()
// mysql checkpoint // mysql checkpoint
store, err := store.New(*app, *table, *connStr) mysqlStore, err := store.New(*app, *table, *connStr)
if err != nil { if err != nil {
log.Fatalf("checkpoint error: %v", err) log.Fatalf("checkpoint error: %v", err)
} }
var counter = expvar.NewMap("counters") var counter = expvar.NewMap("counters")
resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
return aws.Endpoint{
PartitionID: "aws",
URL: *kinesisEndpoint,
SigningRegion: *awsRegion,
}, nil
})
// client // client
cfg, err := config.LoadDefaultConfig( var client = kinesis.New(
context.TODO(), kinesis.Options{
config.WithRegion(*awsRegion), BaseEndpoint: kinesisEndpoint,
config.WithEndpointResolver(resolver), Region: *awsRegion,
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
) })
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
var client = kinesis.NewFromConfig(cfg)
// consumer // consumer
c, err := consumer.New( c, err := consumer.New(
*stream, *stream,
consumer.WithClient(client), consumer.WithClient(client),
consumer.WithStore(store), consumer.WithStore(mysqlStore),
consumer.WithCounter(counter), consumer.WithCounter(counter),
) )
if err != nil { if err != nil {
@ -90,7 +76,7 @@ func main() {
log.Fatalf("scan error: %v", err) log.Fatalf("scan error: %v", err)
} }
if err := store.Shutdown(); err != nil { if err := mysqlStore.Shutdown(); err != nil {
log.Fatalf("store shutdown error: %v", err) log.Fatalf("mysqlStore shutdown error: %v", err)
} }
} }

View file

@ -9,8 +9,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis"
@ -30,38 +28,26 @@ func main() {
flag.Parse() flag.Parse()
// postgres checkpoint // postgres checkpoint
store, err := store.New(*app, *table, *connStr) checkpointStore, err := store.New(*app, *table, *connStr)
if err != nil { if err != nil {
log.Fatalf("checkpoint error: %v", err) log.Fatalf("checkpoint error: %v", err)
} }
var counter = expvar.NewMap("counters") var counter = expvar.NewMap("counters")
resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
return aws.Endpoint{
PartitionID: "aws",
URL: *kinesisEndpoint,
SigningRegion: *awsRegion,
}, nil
})
// client // client
cfg, err := config.LoadDefaultConfig( var client = kinesis.New(
context.TODO(), kinesis.Options{
config.WithRegion(*awsRegion), BaseEndpoint: kinesisEndpoint,
config.WithEndpointResolver(resolver), Region: *awsRegion,
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
) })
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
var client = kinesis.NewFromConfig(cfg)
// consumer // consumer
c, err := consumer.New( c, err := consumer.New(
*stream, *stream,
consumer.WithClient(client), consumer.WithClient(client),
consumer.WithStore(store), consumer.WithStore(checkpointStore),
consumer.WithCounter(counter), consumer.WithCounter(counter),
) )
if err != nil { if err != nil {
@ -90,7 +76,7 @@ func main() {
log.Fatalf("scan error: %v", err) log.Fatalf("scan error: %v", err)
} }
if err := store.Shutdown(); err != nil { if err := checkpointStore.Shutdown(); err != nil {
log.Fatalf("store shutdown error: %v", err) log.Fatalf("store shutdown error: %v", err)
} }
} }

View file

@ -8,8 +8,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis"
@ -36,10 +34,10 @@ func main() {
) )
flag.Parse() flag.Parse()
// redis checkpoint store // redis checkpoint checkpointStore
store, err := store.New(*app) checkpointStore, err := store.New(*app)
if err != nil { if err != nil {
log.Fatalf("store error: %v", err) log.Fatalf("checkpointStore error: %v", err)
} }
// logger // logger
@ -47,31 +45,19 @@ func main() {
logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags), logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags),
} }
resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
return aws.Endpoint{
PartitionID: "aws",
URL: *kinesisEndpoint,
SigningRegion: *awsRegion,
}, nil
})
// client // client
cfg, err := config.LoadDefaultConfig( var client = kinesis.New(
context.TODO(), kinesis.Options{
config.WithRegion(*awsRegion), BaseEndpoint: kinesisEndpoint,
config.WithEndpointResolver(resolver), Region: *awsRegion,
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
) })
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
var client = kinesis.NewFromConfig(cfg)
// consumer // consumer
c, err := consumer.New( c, err := consumer.New(
*stream, *stream,
consumer.WithClient(client), consumer.WithClient(client),
consumer.WithStore(store), consumer.WithStore(checkpointStore),
consumer.WithLogger(logger), consumer.WithLogger(logger),
) )
if err != nil { if err != nil {

View file

@ -9,8 +9,6 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis"
@ -35,25 +33,13 @@ func main() {
) )
flag.Parse() flag.Parse()
resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
return aws.Endpoint{
PartitionID: "aws",
URL: *kinesisEndpoint,
SigningRegion: *awsRegion,
}, nil
})
// client // client
cfg, err := config.LoadDefaultConfig( var client = kinesis.New(
context.TODO(), kinesis.Options{
config.WithRegion(*awsRegion), BaseEndpoint: kinesisEndpoint,
config.WithEndpointResolver(resolver), Region: *awsRegion,
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")), Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
) })
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
var client = kinesis.NewFromConfig(cfg)
// consumer // consumer
c, err := consumer.New( c, err := consumer.New(

View file

@ -10,7 +10,6 @@ import (
"time" "time"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
@ -26,25 +25,12 @@ func main() {
var records []types.PutRecordsRequestEntry var records []types.PutRecordsRequestEntry
resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { var client = kinesis.New(kinesis.Options{
return aws.Endpoint{ BaseEndpoint: kinesisEndpoint,
PartitionID: "aws", Region: *awsRegion,
URL: *kinesisEndpoint, Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
SigningRegion: *awsRegion,
}, nil
}) })
cfg, err := config.LoadDefaultConfig(
context.TODO(),
config.WithRegion(*awsRegion),
config.WithEndpointResolver(resolver),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("user", "pass", "token")),
)
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
var client = kinesis.NewFromConfig(cfg)
// create stream if doesn't exist // create stream if doesn't exist
if err := createStream(client, *streamName); err != nil { if err := createStream(client, *streamName); err != nil {
log.Fatalf("create stream error: %v", err) log.Fatalf("create stream error: %v", err)