From 400ef074638b01e927b29c6cedb6a6981abfeeb3 Mon Sep 17 00:00:00 2001 From: chumbert2 <43178474+chumbert2@users.noreply.github.com> Date: Tue, 28 Apr 2020 06:12:20 +0200 Subject: [PATCH] Allow to override Limit parameter in GetRecords (#113) Add option maxRecords to set the maximum number of records that can be returned by GetRecords. Default value: 10,000. Use WithMaxRecords to change the default value. See https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#GetRecordsInput https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html --- consumer.go | 3 +++ options.go | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/consumer.go b/consumer.go index 62b0bfc..0077344 100644 --- a/consumer.go +++ b/consumer.go @@ -36,6 +36,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { logger: log.New(ioutil.Discard, "", log.LstdFlags), }, scanInterval: 250 * time.Millisecond, + maxRecords: 10000, } // override defaults @@ -71,6 +72,7 @@ type Consumer struct { logger Logger store Store scanInterval time.Duration + maxRecords int64 } // ScanFunc is the type of the function called for each message read @@ -157,6 +159,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e return nil case <-scanTicker.C: resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{ + Limit: aws.Int64(c.maxRecords), ShardIterator: shardIterator, }) diff --git a/options.go b/options.go index 6fa53bc..b8248aa 100644 --- a/options.go +++ b/options.go @@ -64,3 +64,12 @@ func WithScanInterval(d time.Duration) Option { c.scanInterval = d } } + +// WithMaxRecords overrides the maximum number of records to be +// returned in a single GetRecords call for the consumer (specify a +// value of up to 10,000) +func WithMaxRecords(n int64) Option { + return func(c *Consumer) { + c.maxRecords = n + } +}