Return error from scan instead of terminating the program
This commit is contained in:
parent
60ce796c07
commit
8d2cc5bc20
3 changed files with 22 additions and 15 deletions
12
README.md
12
README.md
|
|
@ -31,19 +31,23 @@ func main() {
|
||||||
|
|
||||||
c, err := consumer.New(*app, *stream)
|
c, err := consumer.New(*app, *stream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("new consumer error: %v", err)
|
log.Fatalf("consumer error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Scan(context.TODO(), func(r *kinesis.Record) bool {
|
err = c.Scan(context.TODO(), func(r *kinesis.Record) bool {
|
||||||
fmt.Println(string(r.Data))
|
fmt.Println(string(r.Data))
|
||||||
|
|
||||||
return true // continue scanning
|
return true // continue scanning
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("scan error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: If you need to aggregate based on a specific shard the `ScanShard`
|
||||||
|
// method should be leverged instead.
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Note: If you need to aggregate based on a specific shard the `ScanShard` method should be leverged instead.
|
|
||||||
|
|
||||||
### Configuration
|
### Configuration
|
||||||
|
|
||||||
The consumer requires the following config:
|
The consumer requires the following config:
|
||||||
|
|
|
||||||
18
consumer.go
18
consumer.go
|
|
@ -3,7 +3,6 @@ package consumer
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/apex/log"
|
"github.com/apex/log"
|
||||||
|
|
@ -100,26 +99,25 @@ type Consumer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scan scans each of the shards of the stream, calls the callback
|
// Scan scans each of the shards of the stream, calls the callback
|
||||||
// func with each of the kinesis records
|
// func with each of the kinesis records.
|
||||||
func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) {
|
func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
// grab the stream details
|
||||||
resp, err := c.svc.DescribeStream(
|
resp, err := c.svc.DescribeStream(
|
||||||
&kinesis.DescribeStreamInput{
|
&kinesis.DescribeStreamInput{
|
||||||
StreamName: aws.String(c.streamName),
|
StreamName: aws.String(c.streamName),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.WithError(err).Error("DescribeStream")
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(len(resp.StreamDescription.Shards))
|
wg.Add(len(resp.StreamDescription.Shards))
|
||||||
|
|
||||||
// scan each of the shards
|
// launch goroutine to process each of the shards
|
||||||
for _, shard := range resp.StreamDescription.Shards {
|
for _, shard := range resp.StreamDescription.Shards {
|
||||||
go func(shardID string) {
|
go func(shardID string) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
@ -129,10 +127,12 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScanShard loops over records on a kinesis shard, call the callback func
|
// ScanShard loops over records on a specific shard, calls the callback func
|
||||||
// for each record and checkpoints after each page is processed
|
// for each record and checkpoints after each page is processed.
|
||||||
|
// Note: returning `false` from the callback func will end the scan.
|
||||||
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kinesis.Record) bool) {
|
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kinesis.Record) bool) {
|
||||||
var (
|
var (
|
||||||
logger = c.logger.WithFields(log.Fields{"shard": shardID})
|
logger = c.logger.WithFields(log.Fields{"shard": shardID})
|
||||||
|
|
|
||||||
|
|
@ -24,11 +24,14 @@ func main() {
|
||||||
|
|
||||||
c, err := consumer.New(*app, *stream)
|
c, err := consumer.New(*app, *stream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("new consumer error: %v", err)
|
log.Fatalf("consumer error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Scan(context.TODO(), func(r *kinesis.Record) bool {
|
err = c.Scan(context.TODO(), func(r *kinesis.Record) bool {
|
||||||
fmt.Println(string(r.Data))
|
fmt.Println(string(r.Data))
|
||||||
return true // continue scanning
|
return true // continue scanning
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("scan error: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue