diff --git a/examples/consumer-postgres/main.go b/examples/consumer-postgres/main.go index 79b1ba2..1c459ac 100644 --- a/examples/consumer-postgres/main.go +++ b/examples/consumer-postgres/main.go @@ -4,7 +4,7 @@ import ( "context" "flag" "fmt" - "log" + "log/slog" "os" "os/signal" @@ -16,45 +16,50 @@ import ( store "github.com/alexgridx/kinesis-consumer/store/postgres" ) +var ( + applicationName = flag.String("application.name", "", "Consumer app name") + kinesisAWSRegion = flag.String("kinesis.region", "us-west-2", "AWS Region") + kinesisEndpoint = flag.String("kinesis.endpoint", "http://localhost:4567", "Kinesis endpoint") + kinesisStream = flag.String("kinesis.stream", "", "Stream name") + postgresConnection = flag.String("postgres.connection", "", "Connection Str") + postgresTable = flag.String("postgres.table", "", "Table name") +) + func main() { - var ( - app = flag.String("app", "", "Consumer app name") - stream = flag.String("stream", "", "Stream name") - table = flag.String("table", "", "Table name") - connStr = flag.String("connection", "", "Connection Str") - kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint") - awsRegion = flag.String("region", "us-west-2", "AWS Region") - ) flag.Parse() // postgres checkpoint - checkpointStore, err := store.New(*app, *table, *connStr) + checkpointStore, err := store.New(*applicationName, *postgresTable, *postgresConnection) if err != nil { - log.Fatalf("checkpoint error: %v", err) + slog.Error("checkpoint error", slog.String("error", err.Error())) + os.Exit(1) } registry, ok := prometheus.DefaultRegisterer.(*prometheus.Registry) if !ok { - log.Fatalf("could not get default prometheus registry") + slog.Error("prometheus error") + os.Exit(1) } // client var client = kinesis.New( kinesis.Options{ BaseEndpoint: kinesisEndpoint, - Region: *awsRegion, + Region: *kinesisAWSRegion, Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), }) // consumer c, err := consumer.New( - *stream, + *kinesisStream, consumer.WithClient(client), consumer.WithStore(checkpointStore), consumer.WithMetricRegistry(registry), + consumer.WithLogger(slog.Default()), ) if err != nil { - log.Fatalf("consumer error: %v", err) + slog.Error("consumer error", slog.String("error", err.Error())) + os.Exit(1) } // use cancel func to signal shutdown @@ -76,10 +81,12 @@ func main() { }) if err != nil { - log.Fatalf("scan error: %v", err) + slog.Error("scan error", slog.String("error", err.Error())) + os.Exit(1) } if err := checkpointStore.Shutdown(); err != nil { - log.Fatalf("store shutdown error: %v", err) + slog.Error("store shutdown error", slog.String("error", err.Error())) + os.Exit(1) } } diff --git a/examples/consumer-redis/main.go b/examples/consumer-redis/main.go index e74724e..9eeb5d0 100644 --- a/examples/consumer-redis/main.go +++ b/examples/consumer-redis/main.go @@ -4,7 +4,7 @@ import ( "context" "flag" "fmt" - "log" + "log/slog" "os" "os/signal" @@ -15,53 +15,41 @@ import ( store "github.com/alexgridx/kinesis-consumer/store/redis" ) -// A myLogger provides a minimalistic logger satisfying the Logger interface. -type myLogger struct { - logger *log.Logger -} - -// Log logs the parameters to the stdlib logger. See log.Println. -func (l *myLogger) Log(args ...interface{}) { - l.logger.Println(args...) -} +var ( + applicationName = flag.String("application.name", "", "Consumer app name") + kinesisAWSRegion = flag.String("kinesis.region", "us-west-2", "AWS Region") + kinesisEndpoint = flag.String("kinesis.endpoint", "http://localhost:4567", "Kinesis endpoint") + kinesisStream = flag.String("kinesis.stream", "", "Stream name") +) func main() { - var ( - app = flag.String("app", "", "Consumer app name") - stream = flag.String("stream", "", "Stream name") - kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint") - awsRegion = flag.String("region", "us-west-2", "AWS Region") - ) flag.Parse() // redis checkpoint checkpointStore - checkpointStore, err := store.New(*app) + checkpointStore, err := store.New(*applicationName) if err != nil { - log.Fatalf("checkpointStore error: %v", err) - } - - // logger - logger := &myLogger{ - logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags), + slog.Error("checkpoint store error", slog.String("error", err.Error())) + os.Exit(1) } // client var client = kinesis.New( kinesis.Options{ BaseEndpoint: kinesisEndpoint, - Region: *awsRegion, + Region: *kinesisAWSRegion, Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), }) // consumer c, err := consumer.New( - *stream, + *kinesisStream, consumer.WithClient(client), consumer.WithStore(checkpointStore), - consumer.WithLogger(logger), + consumer.WithLogger(slog.Default()), ) if err != nil { - log.Fatalf("consumer error: %v", err) + slog.Error("consumer error", slog.String("error", err.Error())) + os.Exit(1) } // use cancel func to signal shutdown @@ -83,6 +71,7 @@ func main() { return nil // continue scanning }) if err != nil { - log.Fatalf("scan error: %v", err) + slog.Error("scan error", slog.String("error", err.Error())) + os.Exit(1) } } diff --git a/store/postgres/postgres_test.go b/store/postgres/postgres_test.go index 420c5db..b5c44d5 100644 --- a/store/postgres/postgres_test.go +++ b/store/postgres/postgres_test.go @@ -1,3 +1,5 @@ +//go:build unit + package postgres import ( diff --git a/store/redis/options.go b/store/redis/options.go index 2a7e8ed..92e2296 100644 --- a/store/redis/options.go +++ b/store/redis/options.go @@ -6,7 +6,7 @@ import "github.com/redis/go-redis/v9" type Option func(*Checkpoint) // WithClient overrides the default client -func WithClient(client *redis.Client) Option { +func WithClient(client redis.UniversalClient) Option { return func(c *Checkpoint) { c.client = client } diff --git a/store/redis/redis.go b/store/redis/redis.go index cbc3280..463545a 100644 --- a/store/redis/redis.go +++ b/store/redis/redis.go @@ -45,10 +45,10 @@ func New(appName string, opts ...Option) (*Checkpoint, error) { return c, nil } -// Checkpoint stores and retreives the last evaluated key from a DDB scan +// Checkpoint stores and retrieves the last evaluated key from a DDB scan type Checkpoint struct { appName string - client *redis.Client + client redis.UniversalClient } // GetCheckpoint fetches the checkpoint for a particular Shard. @@ -59,7 +59,7 @@ func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { } // SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). -// Upon failover, record processing is resumed from this point. +// Upon fail over, record processing is resumed from this point. func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { if sequenceNumber == "" { return fmt.Errorf("sequence number should not be empty") diff --git a/store/redis/redis_test.go b/store/redis/redis_test.go index 8ce8681..c8bb051 100644 --- a/store/redis/redis_test.go +++ b/store/redis/redis_test.go @@ -1,3 +1,5 @@ +// go:build unit + package redis import (