#188 allows redis store to use clustered redis

This commit is contained in:
Alex Senger 2024-09-13 14:14:56 +02:00
parent 3775a5af04
commit 1760aaf19d
No known key found for this signature in database
GPG key ID: 0B4A96F8AF6934CF
6 changed files with 49 additions and 49 deletions

View file

@ -4,7 +4,7 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"log" "log/slog"
"os" "os"
"os/signal" "os/signal"
@ -16,45 +16,50 @@ import (
store "github.com/alexgridx/kinesis-consumer/store/postgres" store "github.com/alexgridx/kinesis-consumer/store/postgres"
) )
var (
applicationName = flag.String("application.name", "", "Consumer app name")
kinesisAWSRegion = flag.String("kinesis.region", "us-west-2", "AWS Region")
kinesisEndpoint = flag.String("kinesis.endpoint", "http://localhost:4567", "Kinesis endpoint")
kinesisStream = flag.String("kinesis.stream", "", "Stream name")
postgresConnection = flag.String("postgres.connection", "", "Connection Str")
postgresTable = flag.String("postgres.table", "", "Table name")
)
func main() { func main() {
var (
app = flag.String("app", "", "Consumer app name")
stream = flag.String("stream", "", "Stream name")
table = flag.String("table", "", "Table name")
connStr = flag.String("connection", "", "Connection Str")
kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint")
awsRegion = flag.String("region", "us-west-2", "AWS Region")
)
flag.Parse() flag.Parse()
// postgres checkpoint // postgres checkpoint
checkpointStore, err := store.New(*app, *table, *connStr) checkpointStore, err := store.New(*applicationName, *postgresTable, *postgresConnection)
if err != nil { if err != nil {
log.Fatalf("checkpoint error: %v", err) slog.Error("checkpoint error", slog.String("error", err.Error()))
os.Exit(1)
} }
registry, ok := prometheus.DefaultRegisterer.(*prometheus.Registry) registry, ok := prometheus.DefaultRegisterer.(*prometheus.Registry)
if !ok { if !ok {
log.Fatalf("could not get default prometheus registry") slog.Error("prometheus error")
os.Exit(1)
} }
// client // client
var client = kinesis.New( var client = kinesis.New(
kinesis.Options{ kinesis.Options{
BaseEndpoint: kinesisEndpoint, BaseEndpoint: kinesisEndpoint,
Region: *awsRegion, Region: *kinesisAWSRegion,
Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
}) })
// consumer // consumer
c, err := consumer.New( c, err := consumer.New(
*stream, *kinesisStream,
consumer.WithClient(client), consumer.WithClient(client),
consumer.WithStore(checkpointStore), consumer.WithStore(checkpointStore),
consumer.WithMetricRegistry(registry), consumer.WithMetricRegistry(registry),
consumer.WithLogger(slog.Default()),
) )
if err != nil { if err != nil {
log.Fatalf("consumer error: %v", err) slog.Error("consumer error", slog.String("error", err.Error()))
os.Exit(1)
} }
// use cancel func to signal shutdown // use cancel func to signal shutdown
@ -76,10 +81,12 @@ func main() {
}) })
if err != nil { if err != nil {
log.Fatalf("scan error: %v", err) slog.Error("scan error", slog.String("error", err.Error()))
os.Exit(1)
} }
if err := checkpointStore.Shutdown(); err != nil { if err := checkpointStore.Shutdown(); err != nil {
log.Fatalf("store shutdown error: %v", err) slog.Error("store shutdown error", slog.String("error", err.Error()))
os.Exit(1)
} }
} }

View file

@ -4,7 +4,7 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"log" "log/slog"
"os" "os"
"os/signal" "os/signal"
@ -15,53 +15,41 @@ import (
store "github.com/alexgridx/kinesis-consumer/store/redis" store "github.com/alexgridx/kinesis-consumer/store/redis"
) )
// A myLogger provides a minimalistic logger satisfying the Logger interface. var (
type myLogger struct { applicationName = flag.String("application.name", "", "Consumer app name")
logger *log.Logger kinesisAWSRegion = flag.String("kinesis.region", "us-west-2", "AWS Region")
} kinesisEndpoint = flag.String("kinesis.endpoint", "http://localhost:4567", "Kinesis endpoint")
kinesisStream = flag.String("kinesis.stream", "", "Stream name")
// Log logs the parameters to the stdlib logger. See log.Println. )
func (l *myLogger) Log(args ...interface{}) {
l.logger.Println(args...)
}
func main() { func main() {
var (
app = flag.String("app", "", "Consumer app name")
stream = flag.String("stream", "", "Stream name")
kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint")
awsRegion = flag.String("region", "us-west-2", "AWS Region")
)
flag.Parse() flag.Parse()
// redis checkpoint checkpointStore // redis checkpoint checkpointStore
checkpointStore, err := store.New(*app) checkpointStore, err := store.New(*applicationName)
if err != nil { if err != nil {
log.Fatalf("checkpointStore error: %v", err) slog.Error("checkpoint store error", slog.String("error", err.Error()))
} os.Exit(1)
// logger
logger := &myLogger{
logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags),
} }
// client // client
var client = kinesis.New( var client = kinesis.New(
kinesis.Options{ kinesis.Options{
BaseEndpoint: kinesisEndpoint, BaseEndpoint: kinesisEndpoint,
Region: *awsRegion, Region: *kinesisAWSRegion,
Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"), Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
}) })
// consumer // consumer
c, err := consumer.New( c, err := consumer.New(
*stream, *kinesisStream,
consumer.WithClient(client), consumer.WithClient(client),
consumer.WithStore(checkpointStore), consumer.WithStore(checkpointStore),
consumer.WithLogger(logger), consumer.WithLogger(slog.Default()),
) )
if err != nil { if err != nil {
log.Fatalf("consumer error: %v", err) slog.Error("consumer error", slog.String("error", err.Error()))
os.Exit(1)
} }
// use cancel func to signal shutdown // use cancel func to signal shutdown
@ -83,6 +71,7 @@ func main() {
return nil // continue scanning return nil // continue scanning
}) })
if err != nil { if err != nil {
log.Fatalf("scan error: %v", err) slog.Error("scan error", slog.String("error", err.Error()))
os.Exit(1)
} }
} }

View file

@ -1,3 +1,5 @@
//go:build unit
package postgres package postgres
import ( import (

View file

@ -6,7 +6,7 @@ import "github.com/redis/go-redis/v9"
type Option func(*Checkpoint) type Option func(*Checkpoint)
// WithClient overrides the default client // WithClient overrides the default client
func WithClient(client *redis.Client) Option { func WithClient(client redis.UniversalClient) Option {
return func(c *Checkpoint) { return func(c *Checkpoint) {
c.client = client c.client = client
} }

View file

@ -45,10 +45,10 @@ func New(appName string, opts ...Option) (*Checkpoint, error) {
return c, nil return c, nil
} }
// Checkpoint stores and retreives the last evaluated key from a DDB scan // Checkpoint stores and retrieves the last evaluated key from a DDB scan
type Checkpoint struct { type Checkpoint struct {
appName string appName string
client *redis.Client client redis.UniversalClient
} }
// GetCheckpoint fetches the checkpoint for a particular Shard. // GetCheckpoint fetches the checkpoint for a particular Shard.
@ -59,7 +59,7 @@ func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) {
} }
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
// Upon failover, record processing is resumed from this point. // Upon fail over, record processing is resumed from this point.
func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
if sequenceNumber == "" { if sequenceNumber == "" {
return fmt.Errorf("sequence number should not be empty") return fmt.Errorf("sequence number should not be empty")

View file

@ -1,3 +1,5 @@
// go:build unit
package redis package redis
import ( import (