From bc5c012fd3979ba88826d2ffb400789cbd241998 Mon Sep 17 00:00:00 2001 From: 0livd <24255883+0livd@users.noreply.github.com> Date: Fri, 17 Jan 2020 19:22:10 +0100 Subject: [PATCH] Add scanInterval option (#105) By default a consumer will scan for records every 250ms. This interval can be configured with WithScanInterval. --- consumer.go | 6 +++++- options.go | 7 +++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index 6d56060..62b0bfc 100644 --- a/consumer.go +++ b/consumer.go @@ -35,6 +35,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { logger: &noopLogger{ logger: log.New(ioutil.Discard, "", log.LstdFlags), }, + scanInterval: 250 * time.Millisecond, } // override defaults @@ -69,6 +70,7 @@ type Consumer struct { group Group logger Logger store Store + scanInterval time.Duration } // ScanFunc is the type of the function called for each message read @@ -146,12 +148,14 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e defer func() { c.logger.Log("[CONSUMER] stop scan:", shardID) }() + scanTicker := time.NewTicker(c.scanInterval) + defer scanTicker.Stop() for { select { case <-ctx.Done(): return nil - default: + case <-scanTicker.C: resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{ ShardIterator: shardIterator, }) diff --git a/options.go b/options.go index af7ce2e..6fa53bc 100644 --- a/options.go +++ b/options.go @@ -57,3 +57,10 @@ func WithTimestamp(t time.Time) Option { c.initialTimestamp = &t } } + +// WithScanInterval overrides the scan interval for the consumer +func WithScanInterval(d time.Duration) Option { + return func(c *Consumer) { + c.scanInterval = d + } +}