#5 moves to slog

This commit is contained in:
Alex Senger 2024-04-19 13:29:57 +02:00
parent 36e7fc8b89
commit 95a08a91e8
No known key found for this signature in database
GPG key ID: 0B4A96F8AF6934CF
4 changed files with 21 additions and 40 deletions

View file

@ -2,6 +2,7 @@ package consumer
import (
"context"
"log/slog"
"sync"
"time"
@ -10,12 +11,12 @@ import (
// NewAllGroup returns an initialized AllGroup for consuming
// all shards on a stream
func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logger) *AllGroup {
func NewAllGroup(kinesis kinesisClient, store Store, streamName string, logger *slog.Logger) *AllGroup {
return &AllGroup{
ksis: ksis,
kinesis: kinesis,
shards: make(map[string]types.Shard),
streamName: streamName,
logger: logger,
slog: logger,
Store: store,
}
}
@ -24,9 +25,9 @@ func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logg
// caches a local list of the shards we are already processing
// and routinely polls the stream looking for new shards to process.
type AllGroup struct {
ksis kinesisClient
kinesis kinesisClient
streamName string
logger Logger
slog *slog.Logger
Store
shardMu sync.Mutex
@ -66,11 +67,11 @@ func (g *AllGroup) findNewShards(ctx context.Context, shardc chan types.Shard) {
g.shardMu.Lock()
defer g.shardMu.Unlock()
g.logger.Log("[GROUP]", "fetching shards")
g.slog.DebugContext(ctx, "fetch shards")
shards, err := listShards(ctx, g.ksis, g.streamName)
shards, err := listShards(ctx, g.kinesis, g.streamName)
if err != nil {
g.logger.Log("[GROUP] error:", err)
g.slog.ErrorContext(ctx, "list shards", slog.String("error", err.Error()))
return
}

View file

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"log"
"log/slog"
"sync"
"time"
@ -38,9 +39,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
initialShardIteratorType: types.ShardIteratorTypeLatest,
store: &noopStore{},
counter: &noopCounter{},
logger: &noopLogger{
logger: log.New(io.Discard, "", log.LstdFlags),
},
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
scanInterval: 250 * time.Millisecond,
maxRecords: 10000,
}
@ -75,7 +74,7 @@ type Consumer struct {
client kinesisClient
counter Counter
group Group
logger Logger
logger *slog.Logger
store Store
scanInterval time.Duration
maxRecords int64
@ -154,9 +153,9 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
return fmt.Errorf("get shard iterator error: %w", err)
}
c.logger.Log("[CONSUMER] start scan:", shardID, lastSeqNum)
c.logger.DebugContext(ctx, "start scan", slog.String("shard-id", shardID), slog.String("last-sequence-number", lastSeqNum))
defer func() {
c.logger.Log("[CONSUMER] stop scan:", shardID)
c.logger.DebugContext(ctx, "stop scan", slog.String("shard-id", shardID))
}()
scanTicker := time.NewTicker(c.scanInterval)
@ -170,12 +169,12 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
// attempt to recover from GetRecords error
if err != nil {
c.logger.Log("[CONSUMER] get records error:", err.Error())
if !isRetriableError(err) {
return fmt.Errorf("get records error: %v", err.Error())
}
c.logger.WarnContext(ctx, "get records", slog.String("error", err.Error()))
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
if err != nil {
return fmt.Errorf("get shard iterator error: %w", err)
@ -216,7 +215,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
}
if isShardClosed(resp.NextShardIterator, shardIterator) {
c.logger.Log("[CONSUMER] shard closed:", shardID)
c.logger.DebugContext(ctx, "shard closed", slog.String("shard-id", shardID))
if c.shardClosedHandler != nil {
err := c.shardClosedHandler(c.streamName, shardID)

View file

@ -1,20 +0,0 @@
package consumer
import (
"log"
)
// A Logger is a minimal interface to as an adaptor for external logging library to consumer
type Logger interface {
Log(...interface{})
}
// noopLogger implements logger interface with discard
type noopLogger struct {
logger *log.Logger
}
// Log using stdlib logger. See log.Println.
func (l noopLogger) Log(args ...interface{}) {
l.logger.Println(args...)
}

View file

@ -1,6 +1,7 @@
package consumer
import (
"log/slog"
"time"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
@ -24,7 +25,7 @@ func WithStore(store Store) Option {
}
// WithLogger overrides the default logger
func WithLogger(logger Logger) Option {
func WithLogger(logger *slog.Logger) Option {
return func(c *Consumer) {
c.logger = logger
}