diff --git a/consumer.go b/consumer.go index 62b0bfc..0077344 100644 --- a/consumer.go +++ b/consumer.go @@ -36,6 +36,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { logger: log.New(ioutil.Discard, "", log.LstdFlags), }, scanInterval: 250 * time.Millisecond, + maxRecords: 10000, } // override defaults @@ -71,6 +72,7 @@ type Consumer struct { logger Logger store Store scanInterval time.Duration + maxRecords int64 } // ScanFunc is the type of the function called for each message read @@ -157,6 +159,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e return nil case <-scanTicker.C: resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{ + Limit: aws.Int64(c.maxRecords), ShardIterator: shardIterator, }) diff --git a/options.go b/options.go index 6fa53bc..b8248aa 100644 --- a/options.go +++ b/options.go @@ -64,3 +64,12 @@ func WithScanInterval(d time.Duration) Option { c.scanInterval = d } } + +// WithMaxRecords overrides the maximum number of records to be +// returned in a single GetRecords call for the consumer (specify a +// value of up to 10,000) +func WithMaxRecords(n int64) Option { + return func(c *Consumer) { + c.maxRecords = n + } +}