diff --git a/consumer.go b/consumer.go index 74b59bb..b83d4e8 100644 --- a/consumer.go +++ b/consumer.go @@ -40,8 +40,9 @@ func New(streamName string, opts ...Option) (*Consumer, error) { logger: &noopLogger{ logger: log.New(io.Discard, "", log.LstdFlags), }, - scanInterval: 250 * time.Millisecond, - maxRecords: 10000, + scanInterval: 250 * time.Millisecond, + immediateRescan: false, + maxRecords: 10000, } // override defaults @@ -77,6 +78,7 @@ type Consumer struct { logger Logger store Store scanInterval time.Duration + immediateRescan bool maxRecords int64 isAggregated bool shardClosedHandler ShardClosedHandler @@ -244,6 +246,10 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e } shardIterator = resp.NextShardIterator + if c.immediateRescan && len(resp.Records) > 0 && *resp.MillisBehindLatest > 0 { + // There appears to be additional records, so skip waiting for next tick + continue + } } // Wait for next scan diff --git a/options.go b/options.go index 355ad4c..6c02cdd 100644 --- a/options.go +++ b/options.go @@ -65,6 +65,14 @@ func WithScanInterval(d time.Duration) Option { } } +// WithImmediateRescan overrides whether we wait for the next +// scan interval if records were fetched during a poll +func WithImmediateRescan(r bool) Option { + return func(c *Consumer) { + c.immediateRescan = r + } +} + // WithMaxRecords overrides the maximum number of records to be // returned in a single GetRecords call for the consumer (specify a // value of up to 10,000)