2017-11-20 16:21:40 +00:00
|
|
|
package consumer
|
2016-02-03 05:04:22 +00:00
|
|
|
|
|
|
|
|
import (
|
2017-11-20 16:21:40 +00:00
|
|
|
"context"
|
|
|
|
|
"fmt"
|
2018-07-29 05:53:33 +00:00
|
|
|
"io/ioutil"
|
|
|
|
|
"log"
|
2017-11-20 16:21:40 +00:00
|
|
|
"sync"
|
2016-02-03 05:04:22 +00:00
|
|
|
|
2018-07-29 05:53:33 +00:00
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
|
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
2016-02-03 05:04:22 +00:00
|
|
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
2018-07-29 05:53:33 +00:00
|
|
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
2018-11-05 21:51:51 +00:00
|
|
|
|
|
|
|
|
"github.com/opentracing/opentracing-go"
|
|
|
|
|
"github.com/opentracing/opentracing-go/ext"
|
2016-02-03 05:04:22 +00:00
|
|
|
)
|
|
|
|
|
|
2017-11-27 00:00:11 +00:00
|
|
|
// Record is an alias of record returned from kinesis library
|
2017-11-21 16:58:16 +00:00
|
|
|
type Record = kinesis.Record
|
|
|
|
|
|
2017-11-20 16:21:40 +00:00
|
|
|
// Option is used to override defaults when creating a new Consumer
|
2018-07-29 05:53:33 +00:00
|
|
|
type Option func(*Consumer)
|
2016-05-01 05:23:35 +00:00
|
|
|
|
2017-11-20 16:21:40 +00:00
|
|
|
// WithCheckpoint overrides the default checkpoint
|
2017-11-23 01:44:42 +00:00
|
|
|
func WithCheckpoint(checkpoint Checkpoint) Option {
|
2018-07-29 05:53:33 +00:00
|
|
|
return func(c *Consumer) {
|
2017-11-20 16:21:40 +00:00
|
|
|
c.checkpoint = checkpoint
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WithLogger overrides the default logger
|
2018-06-13 01:07:33 +00:00
|
|
|
func WithLogger(logger Logger) Option {
|
2018-07-29 05:53:33 +00:00
|
|
|
return func(c *Consumer) {
|
2017-11-20 16:21:40 +00:00
|
|
|
c.logger = logger
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-22 22:09:22 +00:00
|
|
|
// WithCounter overrides the default counter
|
|
|
|
|
func WithCounter(counter Counter) Option {
|
2018-07-29 05:53:33 +00:00
|
|
|
return func(c *Consumer) {
|
2017-11-22 22:09:22 +00:00
|
|
|
c.counter = counter
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-27 00:00:11 +00:00
|
|
|
// WithClient overrides the default client
|
2018-07-29 05:53:33 +00:00
|
|
|
func WithClient(client kinesisiface.KinesisAPI) Option {
|
|
|
|
|
return func(c *Consumer) {
|
2017-11-27 00:00:11 +00:00
|
|
|
c.client = client
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-07-29 05:53:33 +00:00
|
|
|
// ScanStatus signals the consumer if we should continue scanning for next record
|
|
|
|
|
// and whether to checkpoint.
|
|
|
|
|
type ScanStatus struct {
|
|
|
|
|
Error error
|
|
|
|
|
StopScan bool
|
|
|
|
|
SkipCheckpoint bool
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-20 16:21:40 +00:00
|
|
|
// New creates a kinesis consumer with default settings. Use Option to override
|
|
|
|
|
// any of the optional attributes.
|
2017-11-27 00:00:11 +00:00
|
|
|
func New(streamName string, opts ...Option) (*Consumer, error) {
|
|
|
|
|
if streamName == "" {
|
2017-11-21 16:58:16 +00:00
|
|
|
return nil, fmt.Errorf("must provide stream name")
|
2017-11-20 16:21:40 +00:00
|
|
|
}
|
|
|
|
|
|
2017-11-27 00:00:11 +00:00
|
|
|
// new consumer with no-op checkpoint, counter, and logger
|
2017-11-20 16:21:40 +00:00
|
|
|
c := &Consumer{
|
2017-11-27 00:00:11 +00:00
|
|
|
streamName: streamName,
|
2017-11-23 01:44:42 +00:00
|
|
|
checkpoint: &noopCheckpoint{},
|
2017-11-23 01:52:41 +00:00
|
|
|
counter: &noopCounter{},
|
2018-07-29 05:53:33 +00:00
|
|
|
logger: &noopLogger{
|
|
|
|
|
logger: log.New(ioutil.Discard, "", log.LstdFlags),
|
|
|
|
|
},
|
2017-11-20 16:21:40 +00:00
|
|
|
}
|
|
|
|
|
|
2017-11-27 00:00:11 +00:00
|
|
|
// override defaults
|
2017-11-20 16:21:40 +00:00
|
|
|
for _, opt := range opts {
|
2018-07-29 05:53:33 +00:00
|
|
|
opt(c)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// default client if none provided
|
|
|
|
|
if c.client == nil {
|
2018-09-03 16:59:39 +00:00
|
|
|
newSession, err := session.NewSession(aws.NewConfig())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
c.client = kinesis.New(newSession)
|
2017-11-20 16:21:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return c, nil
|
2016-02-03 05:04:22 +00:00
|
|
|
}
|
|
|
|
|
|
2017-10-16 00:45:38 +00:00
|
|
|
// Consumer wraps the interaction with the Kinesis stream
|
2016-02-03 05:04:22 +00:00
|
|
|
type Consumer struct {
|
2017-11-20 16:21:40 +00:00
|
|
|
streamName string
|
2018-07-29 05:53:33 +00:00
|
|
|
client kinesisiface.KinesisAPI
|
2018-06-13 01:07:33 +00:00
|
|
|
logger Logger
|
2017-11-23 01:44:42 +00:00
|
|
|
checkpoint Checkpoint
|
2017-11-22 22:09:22 +00:00
|
|
|
counter Counter
|
2016-05-01 05:23:35 +00:00
|
|
|
}
|
|
|
|
|
|
2017-11-20 16:21:40 +00:00
|
|
|
// Scan scans each of the shards of the stream, calls the callback
|
2017-11-20 19:45:30 +00:00
|
|
|
// func with each of the kinesis records.
|
2018-11-14 19:51:37 +00:00
|
|
|
func (c *Consumer) Scan(ctx context.Context, fn func(context.Context, *Record) ScanStatus) error {
|
2018-07-29 05:53:33 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
2018-11-05 21:51:51 +00:00
|
|
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.scan")
|
|
|
|
|
defer span.Finish()
|
|
|
|
|
|
2018-07-29 05:53:33 +00:00
|
|
|
// get shard ids
|
2018-11-05 21:51:51 +00:00
|
|
|
shardIDs, err := c.getShardIDs(ctx, c.streamName)
|
2018-11-07 23:45:13 +00:00
|
|
|
span.SetTag("stream.name", c.streamName)
|
|
|
|
|
span.SetTag("shard.count", len(shardIDs))
|
2016-02-03 05:04:22 +00:00
|
|
|
if err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("get shardID error", err.Error(), "stream.name", c.streamName)
|
|
|
|
|
ext.Error.Set(span, true)
|
|
|
|
|
return fmt.Errorf("get shards error: %s", err.Error())
|
2016-02-03 05:04:22 +00:00
|
|
|
}
|
|
|
|
|
|
2017-11-27 00:00:11 +00:00
|
|
|
if len(shardIDs) == 0 {
|
2018-11-07 23:45:13 +00:00
|
|
|
span.LogKV("stream.name", c.streamName, "shards.count", len(shardIDs))
|
2018-11-05 21:51:51 +00:00
|
|
|
ext.Error.Set(span, true)
|
2017-11-23 19:29:58 +00:00
|
|
|
return fmt.Errorf("no shards available")
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-23 16:49:37 +00:00
|
|
|
var (
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
errc = make(chan error, 1)
|
|
|
|
|
)
|
2017-11-27 00:00:11 +00:00
|
|
|
wg.Add(len(shardIDs))
|
2017-11-20 16:21:40 +00:00
|
|
|
|
2018-07-29 05:53:33 +00:00
|
|
|
// process each shard in a separate goroutine
|
2017-11-27 00:00:11 +00:00
|
|
|
for _, shardID := range shardIDs {
|
2017-11-20 16:21:40 +00:00
|
|
|
go func(shardID string) {
|
|
|
|
|
defer wg.Done()
|
2017-11-23 19:29:58 +00:00
|
|
|
|
|
|
|
|
if err := c.ScanShard(ctx, shardID, fn); err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("scan shard error", err.Error(), "shardID", shardID)
|
|
|
|
|
ext.Error.Set(span, true)
|
2018-11-07 23:45:13 +00:00
|
|
|
span.Finish()
|
2017-11-23 16:49:37 +00:00
|
|
|
select {
|
|
|
|
|
case errc <- fmt.Errorf("shard %s error: %v", shardID, err):
|
|
|
|
|
// first error to occur
|
|
|
|
|
default:
|
|
|
|
|
// error has already occured
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-11-23 19:29:58 +00:00
|
|
|
|
2017-11-20 16:21:40 +00:00
|
|
|
cancel()
|
2017-11-27 00:00:11 +00:00
|
|
|
}(shardID)
|
2016-02-03 05:04:22 +00:00
|
|
|
}
|
2017-11-20 16:21:40 +00:00
|
|
|
|
|
|
|
|
wg.Wait()
|
2017-11-23 16:49:37 +00:00
|
|
|
close(errc)
|
2018-07-29 05:53:33 +00:00
|
|
|
|
2017-11-23 16:49:37 +00:00
|
|
|
return <-errc
|
2016-02-03 05:04:22 +00:00
|
|
|
}
|
|
|
|
|
|
2017-11-20 19:45:30 +00:00
|
|
|
// ScanShard loops over records on a specific shard, calls the callback func
|
2017-11-23 19:29:58 +00:00
|
|
|
// for each record and checkpoints the progress of scan.
|
2018-07-29 05:53:33 +00:00
|
|
|
func (c *Consumer) ScanShard(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
shardID string,
|
2018-11-14 19:51:37 +00:00
|
|
|
fn func(context.Context, *Record) ScanStatus,
|
2018-07-29 05:53:33 +00:00
|
|
|
) error {
|
2018-11-05 21:51:51 +00:00
|
|
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.scanshard")
|
|
|
|
|
defer span.Finish()
|
2018-07-29 05:53:33 +00:00
|
|
|
// get checkpoint
|
2018-11-07 23:45:13 +00:00
|
|
|
lastSeqNum, err := c.checkpoint.Get(ctx, c.streamName, shardID)
|
2017-11-21 16:58:16 +00:00
|
|
|
if err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("checkpoint error", err.Error(), "shardID", shardID)
|
|
|
|
|
ext.Error.Set(span, true)
|
2017-11-23 16:49:37 +00:00
|
|
|
return fmt.Errorf("get checkpoint error: %v", err)
|
2017-11-21 16:58:16 +00:00
|
|
|
}
|
|
|
|
|
|
2018-07-29 05:53:33 +00:00
|
|
|
// get shard iterator
|
2018-11-05 21:51:51 +00:00
|
|
|
shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
2017-11-23 16:49:37 +00:00
|
|
|
if err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("get shard error", err.Error(), "shardID", shardID, "lastSeqNumber", lastSeqNum)
|
|
|
|
|
ext.Error.Set(span, true)
|
2018-07-29 05:53:33 +00:00
|
|
|
return fmt.Errorf("get shard iterator error: %v", err)
|
2017-10-16 00:40:30 +00:00
|
|
|
}
|
2018-06-08 15:40:42 +00:00
|
|
|
|
2018-11-14 19:51:37 +00:00
|
|
|
c.logger.Log(fmt.Sprintf("scanning shardID %s lastSeqNum %s", shardID, lastSeqNum))
|
2017-10-16 00:40:30 +00:00
|
|
|
|
2018-09-03 16:59:39 +00:00
|
|
|
return c.scanPagesOfShard(ctx, shardID, lastSeqNum, shardIterator, fn)
|
|
|
|
|
}
|
|
|
|
|
|
2018-11-14 19:51:37 +00:00
|
|
|
func (c *Consumer) scanPagesOfShard(ctx context.Context, shardID, lastSeqNum string, shardIterator *string, fn func(context.Context, *Record) ScanStatus) error {
|
2018-11-05 21:51:51 +00:00
|
|
|
span := opentracing.SpanFromContext(ctx)
|
2018-07-29 05:53:33 +00:00
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
2018-11-05 21:51:51 +00:00
|
|
|
span.SetTag("scan", "done")
|
2018-07-29 05:53:33 +00:00
|
|
|
return nil
|
|
|
|
|
default:
|
2018-11-05 21:51:51 +00:00
|
|
|
span.SetTag("scan", "on")
|
2018-11-07 23:45:13 +00:00
|
|
|
resp, err := c.client.GetRecordsWithContext(ctx, &kinesis.GetRecordsInput{
|
2018-07-29 05:53:33 +00:00
|
|
|
ShardIterator: shardIterator,
|
|
|
|
|
})
|
2017-10-16 00:40:30 +00:00
|
|
|
|
2018-06-08 15:40:42 +00:00
|
|
|
if err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
2018-07-29 05:53:33 +00:00
|
|
|
if err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
ext.Error.Set(span, true)
|
2018-11-07 23:45:13 +00:00
|
|
|
span.LogKV("get shard iterator error", err.Error())
|
2018-07-29 05:53:33 +00:00
|
|
|
return fmt.Errorf("get shard iterator error: %v", err)
|
|
|
|
|
}
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// loop records of page
|
|
|
|
|
for _, r := range resp.Records {
|
2018-11-05 21:51:51 +00:00
|
|
|
ctx = opentracing.ContextWithSpan(ctx, span)
|
|
|
|
|
isScanStopped, err := c.handleRecord(ctx, shardID, r, fn)
|
2018-09-03 16:59:39 +00:00
|
|
|
if err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("handle record error", err.Error(), "shardID", shardID)
|
|
|
|
|
ext.Error.Set(span, true)
|
2018-07-29 05:53:33 +00:00
|
|
|
return err
|
|
|
|
|
}
|
2018-09-03 16:59:39 +00:00
|
|
|
if isScanStopped {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.SetTag("scan", "stopped")
|
2018-07-29 05:53:33 +00:00
|
|
|
return nil
|
|
|
|
|
}
|
2018-09-03 16:59:39 +00:00
|
|
|
lastSeqNum = *r.SequenceNumber
|
2018-06-08 15:40:42 +00:00
|
|
|
}
|
2018-07-29 05:53:33 +00:00
|
|
|
|
2018-09-03 16:59:39 +00:00
|
|
|
if isShardClosed(resp.NextShardIterator, shardIterator) {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("is shard closed", "true")
|
2018-07-29 05:53:33 +00:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
shardIterator = resp.NextShardIterator
|
2017-11-27 00:00:11 +00:00
|
|
|
}
|
2016-02-03 05:04:22 +00:00
|
|
|
}
|
2018-07-29 05:53:33 +00:00
|
|
|
}
|
2017-10-16 00:40:30 +00:00
|
|
|
|
2018-09-03 16:59:39 +00:00
|
|
|
func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
|
|
|
|
|
return nextShardIterator == nil || currentShardIterator == nextShardIterator
|
|
|
|
|
}
|
|
|
|
|
|
2018-11-14 19:51:37 +00:00
|
|
|
func (c *Consumer) handleRecord(ctx context.Context, shardID string, r *Record, fn func(context.Context, *Record) ScanStatus) (isScanStopped bool, err error) {
|
2018-11-05 21:51:51 +00:00
|
|
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.handleRecord")
|
|
|
|
|
defer span.Finish()
|
2018-11-14 19:51:37 +00:00
|
|
|
status := fn(ctx, r)
|
2018-09-03 16:59:39 +00:00
|
|
|
if !status.SkipCheckpoint {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("scan.state", status)
|
2018-11-07 23:45:13 +00:00
|
|
|
if err := c.checkpoint.Set(ctx, c.streamName, shardID, *r.SequenceNumber); err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("checkpoint error", err.Error(), "stream.name", c.streamName, "shardID", shardID, "sequenceNumber", *r.SequenceNumber)
|
|
|
|
|
ext.Error.Set(span, true)
|
2018-09-03 16:59:39 +00:00
|
|
|
return false, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := status.Error; err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("scan.state", status.Error)
|
|
|
|
|
ext.Error.Set(span, true)
|
2018-09-03 16:59:39 +00:00
|
|
|
return false, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.counter.Add("records", 1)
|
|
|
|
|
|
|
|
|
|
if status.StopScan {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("scan.state", "stopped")
|
2018-09-03 16:59:39 +00:00
|
|
|
return true, nil
|
|
|
|
|
}
|
|
|
|
|
return false, nil
|
|
|
|
|
}
|
|
|
|
|
|
2018-11-05 21:51:51 +00:00
|
|
|
func (c *Consumer) getShardIDs(ctx context.Context, streamName string) ([]string, error) {
|
|
|
|
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.getShardIDs")
|
|
|
|
|
defer span.Finish()
|
2018-11-14 19:51:37 +00:00
|
|
|
span = span.SetTag("streamName", streamName)
|
2018-11-05 21:51:51 +00:00
|
|
|
|
2018-11-07 23:45:13 +00:00
|
|
|
resp, err := c.client.DescribeStreamWithContext(ctx,
|
2018-07-29 05:53:33 +00:00
|
|
|
&kinesis.DescribeStreamInput{
|
|
|
|
|
StreamName: aws.String(streamName),
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("describe stream error", err.Error())
|
|
|
|
|
ext.Error.Set(span, true)
|
2018-07-29 05:53:33 +00:00
|
|
|
return nil, fmt.Errorf("describe stream error: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
2018-09-03 16:59:39 +00:00
|
|
|
var ss []string
|
2018-07-29 05:53:33 +00:00
|
|
|
for _, shard := range resp.StreamDescription.Shards {
|
|
|
|
|
ss = append(ss, *shard.ShardId)
|
|
|
|
|
}
|
|
|
|
|
return ss, nil
|
|
|
|
|
}
|
|
|
|
|
|
2018-11-05 21:51:51 +00:00
|
|
|
func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, lastSeqNum string) (*string, error) {
|
|
|
|
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.getShardIterator",
|
2018-11-14 19:51:37 +00:00
|
|
|
opentracing.Tag{Key: "streamName", Value: streamName},
|
|
|
|
|
opentracing.Tag{Key: "shardID", Value: shardID},
|
|
|
|
|
opentracing.Tag{Key: "lastSeqNum", Value: lastSeqNum})
|
2018-11-05 21:51:51 +00:00
|
|
|
defer span.Finish()
|
2018-11-14 19:51:37 +00:00
|
|
|
shard := aws.String(shardID)
|
|
|
|
|
stream := aws.String(streamName)
|
2018-07-29 05:53:33 +00:00
|
|
|
params := &kinesis.GetShardIteratorInput{
|
2018-11-14 19:51:37 +00:00
|
|
|
ShardId: shard,
|
|
|
|
|
StreamName: stream,
|
2018-07-29 05:53:33 +00:00
|
|
|
}
|
|
|
|
|
|
2018-11-14 19:51:37 +00:00
|
|
|
span = span.SetTag("shardID", shard)
|
|
|
|
|
span = span.SetTag("streamName", stream)
|
|
|
|
|
|
2018-07-29 05:53:33 +00:00
|
|
|
if lastSeqNum != "" {
|
|
|
|
|
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
|
|
|
|
|
params.StartingSequenceNumber = aws.String(lastSeqNum)
|
|
|
|
|
} else {
|
|
|
|
|
params.ShardIteratorType = aws.String("TRIM_HORIZON")
|
|
|
|
|
}
|
|
|
|
|
|
2018-11-07 23:45:13 +00:00
|
|
|
resp, err := c.client.GetShardIteratorWithContext(ctx, params)
|
2018-07-29 05:53:33 +00:00
|
|
|
if err != nil {
|
2018-11-05 21:51:51 +00:00
|
|
|
span.LogKV("get shard error", err.Error())
|
|
|
|
|
ext.Error.Set(span, true)
|
2018-07-29 05:53:33 +00:00
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return resp.ShardIterator, nil
|
2016-02-03 05:04:22 +00:00
|
|
|
}
|