diff --git a/consumer.go b/consumer.go index a01d9da..af8d117 100644 --- a/consumer.go +++ b/consumer.go @@ -39,8 +39,9 @@ func New(streamName string, opts ...Option) (*Consumer, error) { logger: &noopLogger{ logger: log.New(ioutil.Discard, "", log.LstdFlags), }, - scanInterval: 250 * time.Millisecond, - maxRecords: 10000, + scanInterval: 250 * time.Millisecond, + immediateRescan: false, + maxRecords: 10000, } // override defaults @@ -76,6 +77,7 @@ type Consumer struct { logger Logger store Store scanInterval time.Duration + immediateRescan bool maxRecords int64 } @@ -206,6 +208,10 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e } shardIterator = resp.NextShardIterator + if c.immediateRescan && len(resp.Records) > 0 && *resp.MillisBehindLatest > 0 { + // There appears to be additional records, so skip waiting for next tick + continue + } } // Wait for next scan diff --git a/options.go b/options.go index b8248aa..75518ef 100644 --- a/options.go +++ b/options.go @@ -65,6 +65,14 @@ func WithScanInterval(d time.Duration) Option { } } +// WithImmediateRescan overrides whether we wait for the next +// scan interval if records were fetched during a poll +func WithImmediateRescan(r bool) Option { + return func(c *Consumer) { + c.immediateRescan = r + } +} + // WithMaxRecords overrides the maximum number of records to be // returned in a single GetRecords call for the consumer (specify a // value of up to 10,000)