From f43d7356903b963dd18e47184777377686f48cd4 Mon Sep 17 00:00:00 2001 From: Mike McCrary Date: Wed, 18 Dec 2019 14:15:37 -0700 Subject: [PATCH] add shardID to scan func --- consumer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/consumer.go b/consumer.go index 6d56060..29fd4a1 100644 --- a/consumer.go +++ b/consumer.go @@ -30,8 +30,8 @@ func New(streamName string, opts ...Option) (*Consumer, error) { c := &Consumer{ streamName: streamName, initialShardIteratorType: kinesis.ShardIteratorTypeLatest, - store: &noopStore{}, - counter: &noopCounter{}, + store: &noopStore{}, + counter: &noopCounter{}, logger: &noopLogger{ logger: log.New(ioutil.Discard, "", log.LstdFlags), }, @@ -73,10 +73,10 @@ type Consumer struct { // ScanFunc is the type of the function called for each message read // from the stream. The record argument contains the original record -// returned from the AWS Kinesis library. +// returned from the AWS Kinesis library and the shardID. // If an error is returned, scanning stops. The sole exception is when the // function returns the special value ErrSkipCheckpoint. -type ScanFunc func(*Record) error +type ScanFunc func(*Record, string) error // ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that // the current checkpoint should be skipped skipped. It is not returned @@ -180,7 +180,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e case <-ctx.Done(): return nil default: - err := fn(r) + err := fn(r, shardID) if err != nil && err != ErrSkipCheckpoint { return err }