--------- Co-authored-by: ashchepinska <a.shchepinska@external.gridx.de>
This commit is contained in:
parent
52ec3e5f65
commit
f0b82db6ac
9 changed files with 244 additions and 42 deletions
69
consumer.go
69
consumer.go
|
|
@ -10,6 +10,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||
|
|
@ -203,7 +205,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
|||
return fmt.Errorf("get shard iterator error: %w", err)
|
||||
}
|
||||
} else {
|
||||
lastSeqNum, err = c.processRecords(ctx, shardID, resp)
|
||||
lastSeqNum, err = c.processRecords(ctx, shardID, resp, fn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -234,7 +236,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kinesis.GetRecordsOutput) (string, error) {
|
||||
func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kinesis.GetRecordsOutput, fn ScanFunc) (string, error) {
|
||||
if len(resp.Records) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
|
@ -269,46 +271,9 @@ func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kin
|
|||
return "", nil
|
||||
}
|
||||
|
||||
// submit in goroutine
|
||||
go func() {
|
||||
for _, r := range records {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
record := Record{r, shardID, resp.MillisBehindLatest}
|
||||
// blocks until someone is ready to pick it up
|
||||
c.workerPool.Submit(record)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// wait for all tasks to be processed
|
||||
numberOfProcessedTasks := 0
|
||||
timeout := 5 * time.Second
|
||||
countDownTimer := time.NewTimer(timeout)
|
||||
for {
|
||||
if numberOfProcessedTasks == len(records) {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", nil
|
||||
case <-countDownTimer.C:
|
||||
return "", fmt.Errorf("timeline exceeded while awaiting result from workers")
|
||||
default:
|
||||
res, err := c.workerPool.Result()
|
||||
if err != nil && !errors.Is(err, ErrSkipCheckpoint) {
|
||||
return "", err // TODO make it more clever once :)
|
||||
}
|
||||
if errors.Is(err, ErrSkipCheckpoint) || res != nil {
|
||||
numberOfProcessedTasks++
|
||||
countDownTimer.Reset(timeout)
|
||||
|
||||
counterEventsConsumed.With(prometheus.Labels{labelStreamName: c.streamName, labelShardID: shardID}).Inc()
|
||||
c.counter.Add("records", 1)
|
||||
}
|
||||
}
|
||||
err = c.runWorkers(ctx, shardID, resp, fn, records)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// we MUST only reach this point if everything is processed
|
||||
|
|
@ -317,6 +282,9 @@ func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kin
|
|||
if err := c.group.SetCheckpoint(ctx, c.streamName, shardID, lastSeqNum); err != nil {
|
||||
return "", fmt.Errorf("set checkpoint error: %w", err)
|
||||
}
|
||||
|
||||
numberOfProcessedTasks := len(records)
|
||||
|
||||
c.counter.Add("checkpoint", int64(numberOfProcessedTasks))
|
||||
counterCheckpointsWritten.
|
||||
With(prometheus.Labels{labelStreamName: c.streamName, labelShardID: shardID}).
|
||||
|
|
@ -332,6 +300,23 @@ func (c *Consumer) processRecords(ctx context.Context, shardID string, resp *kin
|
|||
return lastSeqNum, nil
|
||||
}
|
||||
|
||||
// runWorkers launches a worker pool to process the records
|
||||
func (c *Consumer) runWorkers(ctx context.Context, shardID string, resp *kinesis.GetRecordsOutput, fn ScanFunc, records []types.Record) error {
|
||||
errGroup, ctx := errgroup.WithContext(ctx)
|
||||
errGroup.SetLimit(c.numWorkers)
|
||||
for _, r := range records {
|
||||
errGroup.Go(func() error {
|
||||
err := fn(&Record{Record: r, ShardID: shardID, MillisBehindLatest: resp.MillisBehindLatest})
|
||||
if !errors.Is(err, ErrSkipCheckpoint) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return errGroup.Wait()
|
||||
}
|
||||
|
||||
// temporary conversion func of []types.Record -> DesegregateRecords([]*types.Record) -> []types.Record
|
||||
func disaggregateRecords(in []types.Record) ([]types.Record, error) {
|
||||
var recs []types.Record
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -17,6 +17,7 @@ require (
|
|||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.20.4
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
golang.org/x/sync v0.7.0
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -106,6 +106,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
|
|||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
|
||||
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
|
||||
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
|
||||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
|
||||
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
|
|
|||
27
vendor/golang.org/x/sync/LICENSE
generated
vendored
Normal file
27
vendor/golang.org/x/sync/LICENSE
generated
vendored
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
Copyright (c) 2009 The Go Authors. All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following disclaimer
|
||||
in the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
* Neither the name of Google Inc. nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
22
vendor/golang.org/x/sync/PATENTS
generated
vendored
Normal file
22
vendor/golang.org/x/sync/PATENTS
generated
vendored
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
Additional IP Rights Grant (Patents)
|
||||
|
||||
"This implementation" means the copyrightable works distributed by
|
||||
Google as part of the Go project.
|
||||
|
||||
Google hereby grants to You a perpetual, worldwide, non-exclusive,
|
||||
no-charge, royalty-free, irrevocable (except as stated in this section)
|
||||
patent license to make, have made, use, offer to sell, sell, import,
|
||||
transfer and otherwise run, modify and propagate the contents of this
|
||||
implementation of Go, where such license applies only to those patent
|
||||
claims, both currently owned or controlled by Google and acquired in
|
||||
the future, licensable by Google that are necessarily infringed by this
|
||||
implementation of Go. This grant does not include claims that would be
|
||||
infringed only as a consequence of further modification of this
|
||||
implementation. If you or your agent or exclusive licensee institute or
|
||||
order or agree to the institution of patent litigation against any
|
||||
entity (including a cross-claim or counterclaim in a lawsuit) alleging
|
||||
that this implementation of Go or any code incorporated within this
|
||||
implementation of Go constitutes direct or contributory patent
|
||||
infringement, or inducement of patent infringement, then any patent
|
||||
rights granted to you under this License for this implementation of Go
|
||||
shall terminate as of the date such litigation is filed.
|
||||
135
vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
Normal file
135
vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
Normal file
|
|
@ -0,0 +1,135 @@
|
|||
// Copyright 2016 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package errgroup provides synchronization, error propagation, and Context
|
||||
// cancelation for groups of goroutines working on subtasks of a common task.
|
||||
//
|
||||
// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks
|
||||
// returning errors.
|
||||
package errgroup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type token struct{}
|
||||
|
||||
// A Group is a collection of goroutines working on subtasks that are part of
|
||||
// the same overall task.
|
||||
//
|
||||
// A zero Group is valid, has no limit on the number of active goroutines,
|
||||
// and does not cancel on error.
|
||||
type Group struct {
|
||||
cancel func(error)
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
sem chan token
|
||||
|
||||
errOnce sync.Once
|
||||
err error
|
||||
}
|
||||
|
||||
func (g *Group) done() {
|
||||
if g.sem != nil {
|
||||
<-g.sem
|
||||
}
|
||||
g.wg.Done()
|
||||
}
|
||||
|
||||
// WithContext returns a new Group and an associated Context derived from ctx.
|
||||
//
|
||||
// The derived Context is canceled the first time a function passed to Go
|
||||
// returns a non-nil error or the first time Wait returns, whichever occurs
|
||||
// first.
|
||||
func WithContext(ctx context.Context) (*Group, context.Context) {
|
||||
ctx, cancel := withCancelCause(ctx)
|
||||
return &Group{cancel: cancel}, ctx
|
||||
}
|
||||
|
||||
// Wait blocks until all function calls from the Go method have returned, then
|
||||
// returns the first non-nil error (if any) from them.
|
||||
func (g *Group) Wait() error {
|
||||
g.wg.Wait()
|
||||
if g.cancel != nil {
|
||||
g.cancel(g.err)
|
||||
}
|
||||
return g.err
|
||||
}
|
||||
|
||||
// Go calls the given function in a new goroutine.
|
||||
// It blocks until the new goroutine can be added without the number of
|
||||
// active goroutines in the group exceeding the configured limit.
|
||||
//
|
||||
// The first call to return a non-nil error cancels the group's context, if the
|
||||
// group was created by calling WithContext. The error will be returned by Wait.
|
||||
func (g *Group) Go(f func() error) {
|
||||
if g.sem != nil {
|
||||
g.sem <- token{}
|
||||
}
|
||||
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.done()
|
||||
|
||||
if err := f(); err != nil {
|
||||
g.errOnce.Do(func() {
|
||||
g.err = err
|
||||
if g.cancel != nil {
|
||||
g.cancel(g.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// TryGo calls the given function in a new goroutine only if the number of
|
||||
// active goroutines in the group is currently below the configured limit.
|
||||
//
|
||||
// The return value reports whether the goroutine was started.
|
||||
func (g *Group) TryGo(f func() error) bool {
|
||||
if g.sem != nil {
|
||||
select {
|
||||
case g.sem <- token{}:
|
||||
// Note: this allows barging iff channels in general allow barging.
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.done()
|
||||
|
||||
if err := f(); err != nil {
|
||||
g.errOnce.Do(func() {
|
||||
g.err = err
|
||||
if g.cancel != nil {
|
||||
g.cancel(g.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
return true
|
||||
}
|
||||
|
||||
// SetLimit limits the number of active goroutines in this group to at most n.
|
||||
// A negative value indicates no limit.
|
||||
//
|
||||
// Any subsequent call to the Go method will block until it can add an active
|
||||
// goroutine without exceeding the configured limit.
|
||||
//
|
||||
// The limit must not be modified while any goroutines in the group are active.
|
||||
func (g *Group) SetLimit(n int) {
|
||||
if n < 0 {
|
||||
g.sem = nil
|
||||
return
|
||||
}
|
||||
if len(g.sem) != 0 {
|
||||
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
|
||||
}
|
||||
g.sem = make(chan token, n)
|
||||
}
|
||||
13
vendor/golang.org/x/sync/errgroup/go120.go
generated
vendored
Normal file
13
vendor/golang.org/x/sync/errgroup/go120.go
generated
vendored
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
// Copyright 2023 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build go1.20
|
||||
|
||||
package errgroup
|
||||
|
||||
import "context"
|
||||
|
||||
func withCancelCause(parent context.Context) (context.Context, func(error)) {
|
||||
return context.WithCancelCause(parent)
|
||||
}
|
||||
14
vendor/golang.org/x/sync/errgroup/pre_go120.go
generated
vendored
Normal file
14
vendor/golang.org/x/sync/errgroup/pre_go120.go
generated
vendored
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
// Copyright 2023 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build !go1.20
|
||||
|
||||
package errgroup
|
||||
|
||||
import "context"
|
||||
|
||||
func withCancelCause(parent context.Context) (context.Context, func(error)) {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
return ctx, func(error) { cancel() }
|
||||
}
|
||||
3
vendor/modules.txt
vendored
3
vendor/modules.txt
vendored
|
|
@ -205,6 +205,9 @@ github.com/yuin/gopher-lua
|
|||
github.com/yuin/gopher-lua/ast
|
||||
github.com/yuin/gopher-lua/parse
|
||||
github.com/yuin/gopher-lua/pm
|
||||
# golang.org/x/sync v0.7.0
|
||||
## explicit; go 1.18
|
||||
golang.org/x/sync/errgroup
|
||||
# golang.org/x/sys v0.25.0
|
||||
## explicit; go 1.18
|
||||
golang.org/x/sys/unix
|
||||
|
|
|
|||
Loading…
Reference in a new issue