diff --git a/consumer.go b/consumer.go index a01d9da..763b0e2 100644 --- a/consumer.go +++ b/consumer.go @@ -20,7 +20,8 @@ import ( // extends to include the shard id. type Record struct { *kinesis.Record - ShardID string + ShardID string + MillisBehindLatest *int64 } // New creates a kinesis consumer with default settings. Use Option to override @@ -184,7 +185,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e case <-ctx.Done(): return nil default: - err := fn(&Record{r, shardID}) + err := fn(&Record{r, shardID, resp.MillisBehindLatest}) if err != nil && err != ErrSkipCheckpoint { return err }