Allow to override Limit parameter in GetRecords (#113)
Add option maxRecords to set the maximum number of records that can be returned by GetRecords. Default value: 10,000. Use WithMaxRecords to change the default value. See https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#GetRecordsInput https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
This commit is contained in:
parent
dcd9d048fb
commit
400ef07463
2 changed files with 12 additions and 0 deletions
|
|
@ -36,6 +36,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
|
|||
logger: log.New(ioutil.Discard, "", log.LstdFlags),
|
||||
},
|
||||
scanInterval: 250 * time.Millisecond,
|
||||
maxRecords: 10000,
|
||||
}
|
||||
|
||||
// override defaults
|
||||
|
|
@ -71,6 +72,7 @@ type Consumer struct {
|
|||
logger Logger
|
||||
store Store
|
||||
scanInterval time.Duration
|
||||
maxRecords int64
|
||||
}
|
||||
|
||||
// ScanFunc is the type of the function called for each message read
|
||||
|
|
@ -157,6 +159,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
|||
return nil
|
||||
case <-scanTicker.C:
|
||||
resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{
|
||||
Limit: aws.Int64(c.maxRecords),
|
||||
ShardIterator: shardIterator,
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -64,3 +64,12 @@ func WithScanInterval(d time.Duration) Option {
|
|||
c.scanInterval = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxRecords overrides the maximum number of records to be
|
||||
// returned in a single GetRecords call for the consumer (specify a
|
||||
// value of up to 10,000)
|
||||
func WithMaxRecords(n int64) Option {
|
||||
return func(c *Consumer) {
|
||||
c.maxRecords = n
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue