From 3372ea1d17d4f3b129f3667226fb02f355c70e91 Mon Sep 17 00:00:00 2001 From: Jason Tackaberry Date: Fri, 31 Jul 2020 14:07:20 -0400 Subject: [PATCH] Add option for immediate rescan When immediate rescan is enabled, a poll from Kinesis that returns records and also indicates we are still behind the latest record will skip waiting for the next scan interval and rescan immediately. Related, the initial scan for a shard is performed immediately. The scan ticker only applies to subsequent scans. These changes make higher scan intervals feasible, allowing for less chatty clients while also improving overall effective throughput. --- consumer.go | 10 ++++++++-- options.go | 8 ++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index a01d9da..af8d117 100644 --- a/consumer.go +++ b/consumer.go @@ -39,8 +39,9 @@ func New(streamName string, opts ...Option) (*Consumer, error) { logger: &noopLogger{ logger: log.New(ioutil.Discard, "", log.LstdFlags), }, - scanInterval: 250 * time.Millisecond, - maxRecords: 10000, + scanInterval: 250 * time.Millisecond, + immediateRescan: false, + maxRecords: 10000, } // override defaults @@ -76,6 +77,7 @@ type Consumer struct { logger Logger store Store scanInterval time.Duration + immediateRescan bool maxRecords int64 } @@ -206,6 +208,10 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e } shardIterator = resp.NextShardIterator + if c.immediateRescan && len(resp.Records) > 0 && *resp.MillisBehindLatest > 0 { + // There appears to be additional records, so skip waiting for next tick + continue + } } // Wait for next scan diff --git a/options.go b/options.go index b8248aa..75518ef 100644 --- a/options.go +++ b/options.go @@ -65,6 +65,14 @@ func WithScanInterval(d time.Duration) Option { } } +// WithImmediateRescan overrides whether we wait for the next +// scan interval if records were fetched during a poll +func WithImmediateRescan(r bool) Option { + return func(c *Consumer) { + c.immediateRescan = r + } +} + // WithMaxRecords overrides the maximum number of records to be // returned in a single GetRecords call for the consumer (specify a // value of up to 10,000)