commit
2d05606a38
4 changed files with 21 additions and 40 deletions
17
allgroup.go
17
allgroup.go
|
|
@ -2,6 +2,7 @@ package consumer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"log/slog"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -10,12 +11,12 @@ import (
|
||||||
|
|
||||||
// NewAllGroup returns an initialized AllGroup for consuming
|
// NewAllGroup returns an initialized AllGroup for consuming
|
||||||
// all shards on a stream
|
// all shards on a stream
|
||||||
func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logger) *AllGroup {
|
func NewAllGroup(kinesis kinesisClient, store Store, streamName string, logger *slog.Logger) *AllGroup {
|
||||||
return &AllGroup{
|
return &AllGroup{
|
||||||
ksis: ksis,
|
kinesis: kinesis,
|
||||||
shards: make(map[string]types.Shard),
|
shards: make(map[string]types.Shard),
|
||||||
streamName: streamName,
|
streamName: streamName,
|
||||||
logger: logger,
|
slog: logger,
|
||||||
Store: store,
|
Store: store,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -24,9 +25,9 @@ func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logg
|
||||||
// caches a local list of the shards we are already processing
|
// caches a local list of the shards we are already processing
|
||||||
// and routinely polls the stream looking for new shards to process.
|
// and routinely polls the stream looking for new shards to process.
|
||||||
type AllGroup struct {
|
type AllGroup struct {
|
||||||
ksis kinesisClient
|
kinesis kinesisClient
|
||||||
streamName string
|
streamName string
|
||||||
logger Logger
|
slog *slog.Logger
|
||||||
Store
|
Store
|
||||||
|
|
||||||
shardMu sync.Mutex
|
shardMu sync.Mutex
|
||||||
|
|
@ -66,11 +67,11 @@ func (g *AllGroup) findNewShards(ctx context.Context, shardc chan types.Shard) {
|
||||||
g.shardMu.Lock()
|
g.shardMu.Lock()
|
||||||
defer g.shardMu.Unlock()
|
defer g.shardMu.Unlock()
|
||||||
|
|
||||||
g.logger.Log("[GROUP]", "fetching shards")
|
g.slog.DebugContext(ctx, "fetch shards")
|
||||||
|
|
||||||
shards, err := listShards(ctx, g.ksis, g.streamName)
|
shards, err := listShards(ctx, g.kinesis, g.streamName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.logger.Log("[GROUP] error:", err)
|
g.slog.ErrorContext(ctx, "list shards", slog.String("error", err.Error()))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
17
consumer.go
17
consumer.go
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"log/slog"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -38,9 +39,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
|
||||||
initialShardIteratorType: types.ShardIteratorTypeLatest,
|
initialShardIteratorType: types.ShardIteratorTypeLatest,
|
||||||
store: &noopStore{},
|
store: &noopStore{},
|
||||||
counter: &noopCounter{},
|
counter: &noopCounter{},
|
||||||
logger: &noopLogger{
|
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
|
||||||
logger: log.New(io.Discard, "", log.LstdFlags),
|
|
||||||
},
|
|
||||||
scanInterval: 250 * time.Millisecond,
|
scanInterval: 250 * time.Millisecond,
|
||||||
maxRecords: 10000,
|
maxRecords: 10000,
|
||||||
}
|
}
|
||||||
|
|
@ -75,7 +74,7 @@ type Consumer struct {
|
||||||
client kinesisClient
|
client kinesisClient
|
||||||
counter Counter
|
counter Counter
|
||||||
group Group
|
group Group
|
||||||
logger Logger
|
logger *slog.Logger
|
||||||
store Store
|
store Store
|
||||||
scanInterval time.Duration
|
scanInterval time.Duration
|
||||||
maxRecords int64
|
maxRecords int64
|
||||||
|
|
@ -154,9 +153,9 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
return fmt.Errorf("get shard iterator error: %w", err)
|
return fmt.Errorf("get shard iterator error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Log("[CONSUMER] start scan:", shardID, lastSeqNum)
|
c.logger.DebugContext(ctx, "start scan", slog.String("shard-id", shardID), slog.String("last-sequence-number", lastSeqNum))
|
||||||
defer func() {
|
defer func() {
|
||||||
c.logger.Log("[CONSUMER] stop scan:", shardID)
|
c.logger.DebugContext(ctx, "stop scan", slog.String("shard-id", shardID))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
scanTicker := time.NewTicker(c.scanInterval)
|
scanTicker := time.NewTicker(c.scanInterval)
|
||||||
|
|
@ -170,12 +169,12 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
|
|
||||||
// attempt to recover from GetRecords error
|
// attempt to recover from GetRecords error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Log("[CONSUMER] get records error:", err.Error())
|
|
||||||
|
|
||||||
if !isRetriableError(err) {
|
if !isRetriableError(err) {
|
||||||
return fmt.Errorf("get records error: %v", err.Error())
|
return fmt.Errorf("get records error: %v", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.logger.WarnContext(ctx, "get records", slog.String("error", err.Error()))
|
||||||
|
|
||||||
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get shard iterator error: %w", err)
|
return fmt.Errorf("get shard iterator error: %w", err)
|
||||||
|
|
@ -216,7 +215,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
}
|
}
|
||||||
|
|
||||||
if isShardClosed(resp.NextShardIterator, shardIterator) {
|
if isShardClosed(resp.NextShardIterator, shardIterator) {
|
||||||
c.logger.Log("[CONSUMER] shard closed:", shardID)
|
c.logger.DebugContext(ctx, "shard closed", slog.String("shard-id", shardID))
|
||||||
|
|
||||||
if c.shardClosedHandler != nil {
|
if c.shardClosedHandler != nil {
|
||||||
err := c.shardClosedHandler(c.streamName, shardID)
|
err := c.shardClosedHandler(c.streamName, shardID)
|
||||||
|
|
|
||||||
20
logger.go
20
logger.go
|
|
@ -1,20 +0,0 @@
|
||||||
package consumer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
)
|
|
||||||
|
|
||||||
// A Logger is a minimal interface to as an adaptor for external logging library to consumer
|
|
||||||
type Logger interface {
|
|
||||||
Log(...interface{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// noopLogger implements logger interface with discard
|
|
||||||
type noopLogger struct {
|
|
||||||
logger *log.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log using stdlib logger. See log.Println.
|
|
||||||
func (l noopLogger) Log(args ...interface{}) {
|
|
||||||
l.logger.Println(args...)
|
|
||||||
}
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package consumer
|
package consumer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log/slog"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||||
|
|
@ -24,7 +25,7 @@ func WithStore(store Store) Option {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger overrides the default logger
|
// WithLogger overrides the default logger
|
||||||
func WithLogger(logger Logger) Option {
|
func WithLogger(logger *slog.Logger) Option {
|
||||||
return func(c *Consumer) {
|
return func(c *Consumer) {
|
||||||
c.logger = logger
|
c.logger = logger
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue