kinesis-consumer/consumer.go

415 lines
12 KiB
Go
Raw Normal View History

package consumer
import (
"context"
"errors"
"fmt"
2024-04-10 13:16:07 +00:00
"io"
"log"
2024-04-19 11:29:57 +00:00
"log/slog"
"sync"
"time"
"golang.org/x/sync/errgroup"
2021-09-22 05:00:14 +00:00
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/awslabs/kinesis-aggregation/go/v2/deaggregator"
2024-05-28 10:05:28 +00:00
"github.com/prometheus/client_golang/prometheus"
)
// Record wraps the record returned from the Kinesis library and
// extends to include the shard id.
type Record struct {
2021-09-22 05:00:14 +00:00
types.Record
ShardID string
MillisBehindLatest *int64
}
// New creates a kinesis consumer with default settings. Use Option to override
// any of the optional attributes.
func New(streamName string, opts ...Option) (*Consumer, error) {
if streamName == "" {
return nil, errors.New("must provide stream name")
}
// new consumer with noop storage, counter, and logger
c := &Consumer{
streamName: streamName,
2021-09-22 05:00:14 +00:00
initialShardIteratorType: types.ShardIteratorTypeLatest,
store: &noopStore{},
counter: &noopCounter{},
2024-04-19 11:29:57 +00:00
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
scanInterval: 250 * time.Millisecond,
maxRecords: 10000,
2024-05-28 10:05:28 +00:00
metricRegistry: nil,
2024-09-19 10:11:59 +00:00
numWorkers: 1,
logger: &noopLogger{
logger: log.New(io.Discard, "", log.LstdFlags),
},
scanInterval: 250 * time.Millisecond,
maxRecords: 10000,
}
// override defaults
for _, opt := range opts {
opt(c)
}
// default client
if c.client == nil {
2021-09-22 05:00:14 +00:00
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
2021-09-22 05:00:14 +00:00
log.Fatalf("unable to load SDK config, %v", err)
}
2021-09-22 05:00:14 +00:00
c.client = kinesis.NewFromConfig(cfg)
}
// default group consumes all shards
if c.group == nil {
c.group = NewAllGroup(c.client, c.store, streamName, c.logger)
}
2024-05-28 10:05:28 +00:00
if c.metricRegistry != nil {
var errs error
errs = errors.Join(errs, c.metricRegistry.Register(collectorMillisBehindLatest))
errs = errors.Join(errs, c.metricRegistry.Register(counterEventsConsumed))
errs = errors.Join(errs, c.metricRegistry.Register(counterCheckpointsWritten))
2024-09-20 09:34:58 +00:00
errs = errors.Join(errs, c.metricRegistry.Register(gaugeBatchSize))
errs = errors.Join(errs, c.metricRegistry.Register(histogramBatchDuration))
errs = errors.Join(errs, c.metricRegistry.Register(histogramAverageRecordDuration))
if errs != nil {
return nil, errs
2024-05-28 10:05:28 +00:00
}
}
return c, nil
}
// Consumer wraps the interaction with the Kinesis stream
type Consumer struct {
streamName string
2021-09-22 05:00:14 +00:00
initialShardIteratorType types.ShardIteratorType
initialTimestamp *time.Time
2021-09-22 05:00:14 +00:00
client kinesisClient
2024-05-28 10:05:28 +00:00
// Deprecated. Will be removed in favor of prometheus in a future release.
counter Counter
group Group
logger *slog.Logger
metricRegistry prometheus.Registerer
store Store
scanInterval time.Duration
maxRecords int64
isAggregated bool
shardClosedHandler ShardClosedHandler
2024-09-19 10:11:59 +00:00
numWorkers int
2024-09-20 09:34:58 +00:00
workerPool *WorkerPool
}
// ScanFunc is the type of the function called for each message read
// from the stream. The record argument contains the original record
// returned from the AWS Kinesis library.
// If an error is returned, scanning stops. The sole exception is when the
2019-07-28 17:54:01 +00:00
// function returns the special value ErrSkipCheckpoint.
type ScanFunc func(*Record) error
2019-07-28 17:54:01 +00:00
// ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that
2024-04-10 13:16:07 +00:00
// the current checkpoint should be skipped. It is not returned
// as an error by any function.
2019-07-28 17:54:01 +00:00
var ErrSkipCheckpoint = errors.New("skip checkpoint")
// Scan launches a goroutine to process each of the shards in the stream. The ScanFunc
// is passed through to each of the goroutines and called with each message pulled from
// the stream.
func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var (
2024-09-19 10:11:59 +00:00
errC = make(chan error, 1)
shardC = make(chan types.Shard, 1)
)
go func() {
err := c.group.Start(ctx, shardC)
if err != nil {
errC <- fmt.Errorf("error starting scan: %w", err)
cancel()
}
<-ctx.Done()
2024-09-19 10:11:59 +00:00
close(shardC)
}()
2017-11-23 19:29:58 +00:00
wg := new(sync.WaitGroup)
// process each of the shards
s := newShardsInProcess()
2024-09-19 10:11:59 +00:00
for shard := range shardC {
shardId := aws.ToString(shard.ShardId)
if s.doesShardExist(shardId) {
// safetynet: if shard already in process by another goroutine, just skipping the request
continue
}
wg.Add(1)
go func(shardID string) {
s.addShard(shardID)
defer func() {
s.deleteShard(shardID)
}()
defer wg.Done()
var err error
if err = c.ScanShard(ctx, shardID, fn); err != nil {
err = fmt.Errorf("shard %s error: %w", shardID, err)
} else if closeable, ok := c.group.(CloseableGroup); !ok {
// group doesn't allow closure, skip calling CloseShard
} else if err = closeable.CloseShard(ctx, shardID); err != nil {
err = fmt.Errorf("shard closed CloseableGroup error: %w", err)
}
if err != nil {
select {
2024-09-19 10:11:59 +00:00
case errC <- fmt.Errorf("shard %s error: %w", shardID, err):
cancel()
default:
2024-09-10 15:22:50 +00:00
// error has already occurred
}
}
}(shardId)
}
go func() {
wg.Wait()
2024-09-19 10:11:59 +00:00
close(errC)
}()
2024-09-19 10:11:59 +00:00
return <-errC
}
// ScanShard loops over records on a specific shard, calls the callback func
// for each record and checkpoints the progress of scan.
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error {
2024-09-20 09:34:58 +00:00
c.workerPool = NewWorkerPool(c.streamName, c.numWorkers, fn)
c.workerPool.Start(ctx)
defer c.workerPool.Stop()
2024-09-19 10:11:59 +00:00
// get last seq number from checkpoint
2024-09-18 12:20:37 +00:00
lastSeqNum, err := c.group.GetCheckpoint(ctx, c.streamName, shardID)
if err != nil {
return fmt.Errorf("get checkpoint error: %w", err)
}
// get shard iterator
shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
if err != nil {
return fmt.Errorf("get shard iterator error: %w", err)
}
2024-04-19 11:29:57 +00:00
c.logger.DebugContext(ctx, "start scan", slog.String("shard-id", shardID), slog.String("last-sequence-number", lastSeqNum))
defer func() {
2024-04-19 11:29:57 +00:00
c.logger.DebugContext(ctx, "stop scan", slog.String("shard-id", shardID))
}()
2021-09-22 05:00:14 +00:00
scanTicker := time.NewTicker(c.scanInterval)
defer scanTicker.Stop()
for {
2021-09-22 05:00:14 +00:00
resp, err := c.client.GetRecords(ctx, &kinesis.GetRecordsInput{
Limit: aws.Int32(int32(c.maxRecords)),
ShardIterator: shardIterator,
})
2021-09-22 05:00:14 +00:00
// attempt to recover from GetRecords error
if err != nil {
2021-09-22 05:00:14 +00:00
if !isRetriableError(err) {
return fmt.Errorf("get records error: %v", err.Error())
}
2024-04-19 11:29:57 +00:00
c.logger.WarnContext(ctx, "get records", slog.String("error", err.Error()))
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
if err != nil {
return fmt.Errorf("get shard iterator error: %w", err)
}
} else {
lastSeqNum, err = c.processRecords(ctx, shardID, resp, fn)
2024-09-20 09:34:58 +00:00
if err != nil {
return err
}
if isShardClosed(resp.NextShardIterator, shardIterator) {
2024-04-19 11:29:57 +00:00
c.logger.DebugContext(ctx, "shard closed", slog.String("shard-id", shardID))
2021-09-22 05:00:14 +00:00
if c.shardClosedHandler != nil {
2024-09-10 15:22:50 +00:00
err := c.shardClosedHandler(c.streamName, shardID)
if err != nil {
return fmt.Errorf("shard closed handler error: %w", err)
}
}
return nil
}
shardIterator = resp.NextShardIterator
}
// Wait for next scan
select {
case <-ctx.Done():
return nil
case <-scanTicker.C:
continue
}
}
}
func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kinesis.GetRecordsOutput, fn ScanFunc) (string, error) {
2024-09-20 09:34:58 +00:00
if len(resp.Records) == 0 {
return "", nil
}
startedAt := time.Now()
batchSize := float64(len(resp.Records))
gaugeBatchSize.
With(prometheus.Labels{labelStreamName: c.streamName, labelShardID: shardID}).
Set(batchSize)
secondsBehindLatest := float64(time.Duration(*resp.MillisBehindLatest)*time.Millisecond) / float64(time.Second)
collectorMillisBehindLatest.
With(prometheus.Labels{labelStreamName: c.streamName, labelShardID: shardID}).
Observe(secondsBehindLatest)
// loop over records, call callback func
var records []types.Record
// disaggregate records
var err error
if c.isAggregated {
records, err = disaggregateRecords(resp.Records)
if err != nil {
return "", err
}
} else {
records = resp.Records
}
if len(records) == 0 {
// nothing to do here
return "", nil
}
err = c.runWorkers(ctx, shardID, resp, fn, records)
if err != nil {
return "", err
2024-09-20 09:34:58 +00:00
}
// we MUST only reach this point if everything is processed
lastSeqNum := *records[len(records)-1].SequenceNumber
if err := c.group.SetCheckpoint(ctx, c.streamName, shardID, lastSeqNum); err != nil {
return "", fmt.Errorf("set checkpoint error: %w", err)
}
numberOfProcessedTasks := len(records)
2024-09-20 09:34:58 +00:00
c.counter.Add("checkpoint", int64(numberOfProcessedTasks))
counterCheckpointsWritten.
With(prometheus.Labels{labelStreamName: c.streamName, labelShardID: shardID}).
Add(float64(numberOfProcessedTasks))
duration := time.Since(startedAt).Seconds()
histogramBatchDuration.
With(prometheus.Labels{labelStreamName: c.streamName, labelShardID: shardID}).
Observe(duration)
histogramAverageRecordDuration.
With(prometheus.Labels{labelStreamName: c.streamName, labelShardID: shardID}).
Observe(duration / batchSize)
return lastSeqNum, nil
}
// runWorkers launches a worker pool to process the records
func (c *Consumer) runWorkers(ctx context.Context, shardID string, resp *kinesis.GetRecordsOutput, fn ScanFunc, records []types.Record) error {
errGroup, ctx := errgroup.WithContext(ctx)
errGroup.SetLimit(c.numWorkers)
for _, r := range records {
errGroup.Go(func() error {
err := fn(&Record{Record: r, ShardID: shardID, MillisBehindLatest: resp.MillisBehindLatest})
if !errors.Is(err, ErrSkipCheckpoint) {
return err
}
return nil
})
}
return errGroup.Wait()
}
2024-04-10 13:16:07 +00:00
// temporary conversion func of []types.Record -> DesegregateRecords([]*types.Record) -> []types.Record
func disaggregateRecords(in []types.Record) ([]types.Record, error) {
2024-04-10 14:45:34 +00:00
var recs []types.Record
recs = append(recs, in...)
2024-04-10 14:45:34 +00:00
deagg, err := deaggregator.DeaggregateRecords(recs)
2021-09-22 05:00:14 +00:00
if err != nil {
return nil, err
}
var out []types.Record
2024-04-10 14:45:34 +00:00
out = append(out, deagg...)
2021-09-22 05:00:14 +00:00
return out, nil
}
func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, seqNum string) (*string, error) {
params := &kinesis.GetShardIteratorInput{
ShardId: aws.String(shardID),
StreamName: aws.String(streamName),
}
if seqNum != "" {
2021-09-22 05:00:14 +00:00
params.ShardIteratorType = types.ShardIteratorTypeAfterSequenceNumber
params.StartingSequenceNumber = aws.String(seqNum)
} else if c.initialTimestamp != nil {
2021-09-22 05:00:14 +00:00
params.ShardIteratorType = types.ShardIteratorTypeAtTimestamp
params.Timestamp = c.initialTimestamp
} else {
2024-04-10 13:16:07 +00:00
params.ShardIteratorType = c.initialShardIteratorType
}
2021-09-22 05:00:14 +00:00
res, err := c.client.GetShardIterator(ctx, params)
2024-09-10 15:22:50 +00:00
return res.ShardIterator, err
}
2021-09-22 05:00:14 +00:00
func isRetriableError(err error) bool {
if oe := (*types.ExpiredIteratorException)(nil); errors.As(err, &oe) {
2021-09-22 05:00:14 +00:00
return true
}
if oe := (*types.ProvisionedThroughputExceededException)(nil); errors.As(err, &oe) {
2021-09-22 05:00:14 +00:00
return true
}
return false
}
func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
return nextShardIterator == nil || currentShardIterator == nextShardIterator
}
type shards struct {
shardsInProcess sync.Map
}
func newShardsInProcess() *shards {
return &shards{}
}
func (s *shards) addShard(shardId string) {
s.shardsInProcess.Store(shardId, struct{}{})
}
func (s *shards) doesShardExist(shardId string) bool {
_, ok := s.shardsInProcess.Load(shardId)
return ok
}
func (s *shards) deleteShard(shardId string) {
s.shardsInProcess.Delete(shardId)
}