Include MillisBehindLatest in Record for ScanFunc (#124)
This commit is contained in:
parent
3f2519e51c
commit
e60d217333
1 changed files with 3 additions and 2 deletions
|
|
@ -20,7 +20,8 @@ import (
|
||||||
// extends to include the shard id.
|
// extends to include the shard id.
|
||||||
type Record struct {
|
type Record struct {
|
||||||
*kinesis.Record
|
*kinesis.Record
|
||||||
ShardID string
|
ShardID string
|
||||||
|
MillisBehindLatest *int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a kinesis consumer with default settings. Use Option to override
|
// New creates a kinesis consumer with default settings. Use Option to override
|
||||||
|
|
@ -184,7 +185,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
err := fn(&Record{r, shardID})
|
err := fn(&Record{r, shardID, resp.MillisBehindLatest})
|
||||||
if err != nil && err != ErrSkipCheckpoint {
|
if err != nil && err != ErrSkipCheckpoint {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue