This commit is contained in:
Jason Tackaberry 2025-01-23 03:18:52 +00:00 committed by GitHub
commit 8ae1ec4bc3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 16 additions and 2 deletions

View file

@ -40,8 +40,9 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
logger: &noopLogger{
logger: log.New(io.Discard, "", log.LstdFlags),
},
scanInterval: 250 * time.Millisecond,
maxRecords: 10000,
scanInterval: 250 * time.Millisecond,
immediateRescan: false,
maxRecords: 10000,
}
// override defaults
@ -77,6 +78,7 @@ type Consumer struct {
logger Logger
store Store
scanInterval time.Duration
immediateRescan bool
maxRecords int64
isAggregated bool
shardClosedHandler ShardClosedHandler
@ -244,6 +246,10 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
}
shardIterator = resp.NextShardIterator
if c.immediateRescan && len(resp.Records) > 0 && *resp.MillisBehindLatest > 0 {
// There appears to be additional records, so skip waiting for next tick
continue
}
}
// Wait for next scan

View file

@ -65,6 +65,14 @@ func WithScanInterval(d time.Duration) Option {
}
}
// WithImmediateRescan overrides whether we wait for the next
// scan interval if records were fetched during a poll
func WithImmediateRescan(r bool) Option {
return func(c *Consumer) {
c.immediateRescan = r
}
}
// WithMaxRecords overrides the maximum number of records to be
// returned in a single GetRecords call for the consumer (specify a
// value of up to 10,000)