diff --git a/consumer.go b/consumer.go index 0077344..72e6942 100644 --- a/consumer.go +++ b/consumer.go @@ -16,8 +16,12 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" ) -// Record is an alias of record returned from kinesis library -type Record = kinesis.Record +// Record wraps the record returned from the Kinesis library and +// extends to include the shard id. +type Record struct { + *kinesis.Record + ShardID string +} // New creates a kinesis consumer with default settings. Use Option to override // any of the optional attributes. @@ -187,7 +191,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e case <-ctx.Done(): return nil default: - err := fn(r) + err := fn(&Record{r, shardID}) if err != nil && err != ErrSkipCheckpoint { return err } diff --git a/consumer_test.go b/consumer_test.go index f1f9cca..c070005 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -260,7 +260,7 @@ func TestScanShard_ShardIsClosed(t *testing.T) { getRecordsMock: func(input *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) { return &kinesis.GetRecordsOutput{ NextShardIterator: nil, - Records: make([]*Record, 0), + Records: make([]*kinesis.Record, 0), }, nil }, }