diff --git a/consumer.go b/consumer.go index c4c118d..8c3836a 100644 --- a/consumer.go +++ b/consumer.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/kinesis" @@ -203,7 +205,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e return fmt.Errorf("get shard iterator error: %w", err) } } else { - lastSeqNum, err = c.processRecords(ctx, shardID, resp) + lastSeqNum, err = c.processRecords(ctx, shardID, resp, fn) if err != nil { return err } @@ -234,7 +236,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e } } -func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kinesis.GetRecordsOutput) (string, error) { +func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kinesis.GetRecordsOutput, fn ScanFunc) (string, error) { if len(resp.Records) == 0 { return "", nil } @@ -269,46 +271,9 @@ func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kin return "", nil } - // submit in goroutine - go func() { - for _, r := range records { - select { - case <-ctx.Done(): - return - default: - record := Record{r, shardID, resp.MillisBehindLatest} - // blocks until someone is ready to pick it up - c.workerPool.Submit(record) - } - } - }() - - // wait for all tasks to be processed - numberOfProcessedTasks := 0 - timeout := 5 * time.Second - countDownTimer := time.NewTimer(timeout) - for { - if numberOfProcessedTasks == len(records) { - break - } - select { - case <-ctx.Done(): - return "", nil - case <-countDownTimer.C: - return "", fmt.Errorf("timeline exceeded while awaiting result from workers") - default: - res, err := c.workerPool.Result() - if err != nil && !errors.Is(err, ErrSkipCheckpoint) { - return "", err // TODO make it more clever once :) - } - if errors.Is(err, ErrSkipCheckpoint) || res != nil { - numberOfProcessedTasks++ - countDownTimer.Reset(timeout) - - counterEventsConsumed.With(prometheus.Labels{labelStreamName: c.streamName, labelShardID: shardID}).Inc() - c.counter.Add("records", 1) - } - } + err = c.runWorkers(ctx, shardID, resp, fn, records) + if err != nil { + return "", err } // we MUST only reach this point if everything is processed @@ -317,6 +282,9 @@ func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kin if err := c.group.SetCheckpoint(ctx, c.streamName, shardID, lastSeqNum); err != nil { return "", fmt.Errorf("set checkpoint error: %w", err) } + + numberOfProcessedTasks := len(records) + c.counter.Add("checkpoint", int64(numberOfProcessedTasks)) counterCheckpointsWritten. With(prometheus.Labels{labelStreamName: c.streamName, labelShardID: shardID}). @@ -332,6 +300,23 @@ func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kin return lastSeqNum, nil } +// runWorkers launches a worker pool to process the records +func (c *Consumer) runWorkers(ctx context.Context, shardID string, resp *kinesis.GetRecordsOutput, fn ScanFunc, records []types.Record) error { + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(c.numWorkers) + for _, r := range records { + errGroup.Go(func() error { + err := fn(&Record{Record: r, ShardID: shardID, MillisBehindLatest: resp.MillisBehindLatest}) + if !errors.Is(err, ErrSkipCheckpoint) { + return err + } + return nil + }) + } + + return errGroup.Wait() +} + // temporary conversion func of []types.Record -> DesegregateRecords([]*types.Record) -> []types.Record func disaggregateRecords(in []types.Record) ([]types.Record, error) { var recs []types.Record diff --git a/go.mod b/go.mod index 429bd46..4f786f8 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.4 github.com/redis/go-redis/v9 v9.6.1 + golang.org/x/sync v0.7.0 ) require ( diff --git a/go.sum b/go.sum index 6c50b5b..8ab18e5 100644 --- a/go.sum +++ b/go.sum @@ -106,6 +106,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 0000000..6a66aea --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 0000000..7330990 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 0000000..948a3ee --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,135 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +// +// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks +// returning errors. +package errgroup + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func(error) + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := withCancelCause(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel(g.err) + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context, if the +// group was created by calling WithContext. The error will be returned by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +} diff --git a/vendor/golang.org/x/sync/errgroup/go120.go b/vendor/golang.org/x/sync/errgroup/go120.go new file mode 100644 index 0000000..f93c740 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/go120.go @@ -0,0 +1,13 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.20 + +package errgroup + +import "context" + +func withCancelCause(parent context.Context) (context.Context, func(error)) { + return context.WithCancelCause(parent) +} diff --git a/vendor/golang.org/x/sync/errgroup/pre_go120.go b/vendor/golang.org/x/sync/errgroup/pre_go120.go new file mode 100644 index 0000000..88ce334 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/pre_go120.go @@ -0,0 +1,14 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !go1.20 + +package errgroup + +import "context" + +func withCancelCause(parent context.Context) (context.Context, func(error)) { + ctx, cancel := context.WithCancel(parent) + return ctx, func(error) { cancel() } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 21b99f0..d9540a4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -205,6 +205,9 @@ github.com/yuin/gopher-lua github.com/yuin/gopher-lua/ast github.com/yuin/gopher-lua/parse github.com/yuin/gopher-lua/pm +# golang.org/x/sync v0.7.0 +## explicit; go 1.18 +golang.org/x/sync/errgroup # golang.org/x/sys v0.25.0 ## explicit; go 1.18 golang.org/x/sys/unix