diff --git a/consumer.go b/consumer.go index 6d56060..62b0bfc 100644 --- a/consumer.go +++ b/consumer.go @@ -35,6 +35,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { logger: &noopLogger{ logger: log.New(ioutil.Discard, "", log.LstdFlags), }, + scanInterval: 250 * time.Millisecond, } // override defaults @@ -69,6 +70,7 @@ type Consumer struct { group Group logger Logger store Store + scanInterval time.Duration } // ScanFunc is the type of the function called for each message read @@ -146,12 +148,14 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e defer func() { c.logger.Log("[CONSUMER] stop scan:", shardID) }() + scanTicker := time.NewTicker(c.scanInterval) + defer scanTicker.Stop() for { select { case <-ctx.Done(): return nil - default: + case <-scanTicker.C: resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{ ShardIterator: shardIterator, }) diff --git a/options.go b/options.go index af7ce2e..6fa53bc 100644 --- a/options.go +++ b/options.go @@ -57,3 +57,10 @@ func WithTimestamp(t time.Time) Option { c.initialTimestamp = &t } } + +// WithScanInterval overrides the scan interval for the consumer +func WithScanInterval(d time.Duration) Option { + return func(c *Consumer) { + c.scanInterval = d + } +}