diff --git a/allgroup.go b/allgroup.go index e62af47..328ab0d 100644 --- a/allgroup.go +++ b/allgroup.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "log/slog" "sync" "time" @@ -10,12 +11,12 @@ import ( // NewAllGroup returns an initialized AllGroup for consuming // all shards on a stream -func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logger) *AllGroup { +func NewAllGroup(kinesis kinesisClient, store Store, streamName string, logger *slog.Logger) *AllGroup { return &AllGroup{ - ksis: ksis, + kinesis: kinesis, shards: make(map[string]types.Shard), streamName: streamName, - logger: logger, + slog: logger, Store: store, } } @@ -24,9 +25,9 @@ func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logg // caches a local list of the shards we are already processing // and routinely polls the stream looking for new shards to process. type AllGroup struct { - ksis kinesisClient + kinesis kinesisClient streamName string - logger Logger + slog *slog.Logger Store shardMu sync.Mutex @@ -66,11 +67,11 @@ func (g *AllGroup) findNewShards(ctx context.Context, shardc chan types.Shard) { g.shardMu.Lock() defer g.shardMu.Unlock() - g.logger.Log("[GROUP]", "fetching shards") + g.slog.DebugContext(ctx, "fetch shards") - shards, err := listShards(ctx, g.ksis, g.streamName) + shards, err := listShards(ctx, g.kinesis, g.streamName) if err != nil { - g.logger.Log("[GROUP] error:", err) + g.slog.ErrorContext(ctx, "list shards", slog.String("error", err.Error())) return } diff --git a/consumer.go b/consumer.go index c492766..05ae65c 100644 --- a/consumer.go +++ b/consumer.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "log/slog" "sync" "time" @@ -38,11 +39,9 @@ func New(streamName string, opts ...Option) (*Consumer, error) { initialShardIteratorType: types.ShardIteratorTypeLatest, store: &noopStore{}, counter: &noopCounter{}, - logger: &noopLogger{ - logger: log.New(io.Discard, "", log.LstdFlags), - }, - scanInterval: 250 * time.Millisecond, - maxRecords: 10000, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + scanInterval: 250 * time.Millisecond, + maxRecords: 10000, } // override defaults @@ -75,7 +74,7 @@ type Consumer struct { client kinesisClient counter Counter group Group - logger Logger + logger *slog.Logger store Store scanInterval time.Duration maxRecords int64 @@ -154,9 +153,9 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e return fmt.Errorf("get shard iterator error: %w", err) } - c.logger.Log("[CONSUMER] start scan:", shardID, lastSeqNum) + c.logger.DebugContext(ctx, "start scan", slog.String("shard-id", shardID), slog.String("last-sequence-number", lastSeqNum)) defer func() { - c.logger.Log("[CONSUMER] stop scan:", shardID) + c.logger.DebugContext(ctx, "stop scan", slog.String("shard-id", shardID)) }() scanTicker := time.NewTicker(c.scanInterval) @@ -170,12 +169,12 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e // attempt to recover from GetRecords error if err != nil { - c.logger.Log("[CONSUMER] get records error:", err.Error()) - if !isRetriableError(err) { return fmt.Errorf("get records error: %v", err.Error()) } + c.logger.WarnContext(ctx, "get records", slog.String("error", err.Error())) + shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum) if err != nil { return fmt.Errorf("get shard iterator error: %w", err) @@ -216,7 +215,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e } if isShardClosed(resp.NextShardIterator, shardIterator) { - c.logger.Log("[CONSUMER] shard closed:", shardID) + c.logger.DebugContext(ctx, "shard closed", slog.String("shard-id", shardID)) if c.shardClosedHandler != nil { err := c.shardClosedHandler(c.streamName, shardID) diff --git a/logger.go b/logger.go deleted file mode 100644 index f1896c0..0000000 --- a/logger.go +++ /dev/null @@ -1,20 +0,0 @@ -package consumer - -import ( - "log" -) - -// A Logger is a minimal interface to as an adaptor for external logging library to consumer -type Logger interface { - Log(...interface{}) -} - -// noopLogger implements logger interface with discard -type noopLogger struct { - logger *log.Logger -} - -// Log using stdlib logger. See log.Println. -func (l noopLogger) Log(args ...interface{}) { - l.logger.Println(args...) -} diff --git a/options.go b/options.go index 5e20f0d..0978aa4 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,7 @@ package consumer import ( + "log/slog" "time" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" @@ -24,7 +25,7 @@ func WithStore(store Store) Option { } // WithLogger overrides the default logger -func WithLogger(logger Logger) Option { +func WithLogger(logger *slog.Logger) Option { return func(c *Consumer) { c.logger = logger }