add shardID to scan func

This commit is contained in:
Mike McCrary 2019-12-18 14:15:37 -07:00
parent f85f25c15e
commit f43d735690

View file

@ -30,8 +30,8 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
c := &Consumer{ c := &Consumer{
streamName: streamName, streamName: streamName,
initialShardIteratorType: kinesis.ShardIteratorTypeLatest, initialShardIteratorType: kinesis.ShardIteratorTypeLatest,
store: &noopStore{}, store: &noopStore{},
counter: &noopCounter{}, counter: &noopCounter{},
logger: &noopLogger{ logger: &noopLogger{
logger: log.New(ioutil.Discard, "", log.LstdFlags), logger: log.New(ioutil.Discard, "", log.LstdFlags),
}, },
@ -73,10 +73,10 @@ type Consumer struct {
// ScanFunc is the type of the function called for each message read // ScanFunc is the type of the function called for each message read
// from the stream. The record argument contains the original record // from the stream. The record argument contains the original record
// returned from the AWS Kinesis library. // returned from the AWS Kinesis library and the shardID.
// If an error is returned, scanning stops. The sole exception is when the // If an error is returned, scanning stops. The sole exception is when the
// function returns the special value ErrSkipCheckpoint. // function returns the special value ErrSkipCheckpoint.
type ScanFunc func(*Record) error type ScanFunc func(*Record, string) error
// ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that // ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that
// the current checkpoint should be skipped skipped. It is not returned // the current checkpoint should be skipped skipped. It is not returned
@ -180,7 +180,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
default: default:
err := fn(r) err := fn(r, shardID)
if err != nil && err != ErrSkipCheckpoint { if err != nil && err != ErrSkipCheckpoint {
return err return err
} }