Add scanInterval option (#105)

By default a consumer will scan for records every 250ms.
This interval can be configured with WithScanInterval.
This commit is contained in:
0livd 2020-01-17 19:22:10 +01:00 committed by Harlow Ward
parent 217999854b
commit bc5c012fd3
2 changed files with 12 additions and 1 deletions

View file

@ -35,6 +35,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
logger: &noopLogger{ logger: &noopLogger{
logger: log.New(ioutil.Discard, "", log.LstdFlags), logger: log.New(ioutil.Discard, "", log.LstdFlags),
}, },
scanInterval: 250 * time.Millisecond,
} }
// override defaults // override defaults
@ -69,6 +70,7 @@ type Consumer struct {
group Group group Group
logger Logger logger Logger
store Store store Store
scanInterval time.Duration
} }
// ScanFunc is the type of the function called for each message read // ScanFunc is the type of the function called for each message read
@ -146,12 +148,14 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
defer func() { defer func() {
c.logger.Log("[CONSUMER] stop scan:", shardID) c.logger.Log("[CONSUMER] stop scan:", shardID)
}() }()
scanTicker := time.NewTicker(c.scanInterval)
defer scanTicker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
default: case <-scanTicker.C:
resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{ resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{
ShardIterator: shardIterator, ShardIterator: shardIterator,
}) })

View file

@ -57,3 +57,10 @@ func WithTimestamp(t time.Time) Option {
c.initialTimestamp = &t c.initialTimestamp = &t
} }
} }
// WithScanInterval overrides the scan interval for the consumer
func WithScanInterval(d time.Duration) Option {
return func(c *Consumer) {
c.scanInterval = d
}
}