2016-02-03 05:04:22 +00:00
|
|
|
package connector
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"os"
|
|
|
|
|
|
2016-05-01 05:23:35 +00:00
|
|
|
"github.com/apex/log"
|
|
|
|
|
"github.com/apex/log/handlers/text"
|
2016-02-03 05:04:22 +00:00
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
|
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
|
|
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
|
|
|
|
)
|
|
|
|
|
|
2016-02-10 06:31:15 +00:00
|
|
|
var (
|
2016-05-01 17:43:42 +00:00
|
|
|
maxRecordCount = 1000
|
|
|
|
|
maxBufferTime = "30s"
|
2016-02-10 06:31:15 +00:00
|
|
|
)
|
2016-02-03 05:04:22 +00:00
|
|
|
|
2016-02-10 06:31:15 +00:00
|
|
|
// NewConsumer creates a new kinesis connection and returns a
|
|
|
|
|
// new consumer initialized with app and stream name
|
2016-02-03 05:04:22 +00:00
|
|
|
func NewConsumer(appName, streamName string) *Consumer {
|
2016-05-01 05:23:35 +00:00
|
|
|
log.SetHandler(text.New(os.Stderr))
|
|
|
|
|
|
|
|
|
|
svc := kinesis.New(
|
|
|
|
|
session.New(
|
|
|
|
|
aws.NewConfig().WithMaxRetries(10),
|
|
|
|
|
),
|
2016-05-01 00:04:44 +00:00
|
|
|
)
|
2016-02-03 05:04:22 +00:00
|
|
|
|
|
|
|
|
return &Consumer{
|
|
|
|
|
appName: appName,
|
|
|
|
|
streamName: streamName,
|
|
|
|
|
svc: svc,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Consumer struct {
|
|
|
|
|
appName string
|
|
|
|
|
streamName string
|
|
|
|
|
svc *kinesis.Kinesis
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-10 06:31:15 +00:00
|
|
|
// Set `option` to `value`
|
|
|
|
|
func (c *Consumer) Set(option string, value interface{}) {
|
|
|
|
|
switch option {
|
2016-05-01 17:43:42 +00:00
|
|
|
case "maxRecordCount":
|
|
|
|
|
maxRecordCount = value.(int)
|
2016-02-10 06:31:15 +00:00
|
|
|
default:
|
2016-05-01 05:23:35 +00:00
|
|
|
log.Error("invalid option")
|
2016-02-10 06:31:15 +00:00
|
|
|
os.Exit(1)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-01 05:23:35 +00:00
|
|
|
// SetLogHandler allows users override logger
|
|
|
|
|
func (c *Consumer) SetLogHandler(handler log.Handler) {
|
|
|
|
|
log.SetHandler(handler)
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-01 00:04:44 +00:00
|
|
|
// Start takes a handler and then loops over each of the shards
|
|
|
|
|
// processing each one with the handler.
|
2016-02-03 05:04:22 +00:00
|
|
|
func (c *Consumer) Start(handler Handler) {
|
2016-05-01 00:04:44 +00:00
|
|
|
resp, err := c.svc.DescribeStream(
|
|
|
|
|
&kinesis.DescribeStreamInput{
|
|
|
|
|
StreamName: aws.String(c.streamName),
|
|
|
|
|
},
|
|
|
|
|
)
|
2016-02-03 05:04:22 +00:00
|
|
|
|
|
|
|
|
if err != nil {
|
2016-05-01 05:23:35 +00:00
|
|
|
log.WithError(err).Error("DescribeStream")
|
2016-02-03 05:04:22 +00:00
|
|
|
os.Exit(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, shard := range resp.StreamDescription.Shards {
|
|
|
|
|
go c.handlerLoop(*shard.ShardId, handler)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Consumer) handlerLoop(shardID string, handler Handler) {
|
2016-05-01 05:23:35 +00:00
|
|
|
ctx := log.WithFields(log.Fields{
|
|
|
|
|
"app": c.appName,
|
|
|
|
|
"stream": c.streamName,
|
|
|
|
|
"shard": shardID,
|
|
|
|
|
})
|
|
|
|
|
|
2016-05-01 01:05:04 +00:00
|
|
|
buf := &Buffer{
|
2016-05-01 17:43:42 +00:00
|
|
|
MaxRecordCount: maxRecordCount,
|
2016-05-01 01:05:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
checkpoint := &Checkpoint{
|
|
|
|
|
AppName: c.appName,
|
|
|
|
|
StreamName: c.streamName,
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-03 05:04:22 +00:00
|
|
|
params := &kinesis.GetShardIteratorInput{
|
|
|
|
|
ShardId: aws.String(shardID),
|
|
|
|
|
StreamName: aws.String(c.streamName),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if checkpoint.CheckpointExists(shardID) {
|
|
|
|
|
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
|
|
|
|
|
params.StartingSequenceNumber = aws.String(checkpoint.SequenceNumber())
|
|
|
|
|
} else {
|
|
|
|
|
params.ShardIteratorType = aws.String("TRIM_HORIZON")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resp, err := c.svc.GetShardIterator(params)
|
|
|
|
|
if err != nil {
|
2016-05-01 05:23:35 +00:00
|
|
|
ctx.WithError(err).Error("getShardIterator")
|
|
|
|
|
os.Exit(1)
|
2016-02-03 05:04:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
shardIterator := resp.ShardIterator
|
2016-05-01 17:43:42 +00:00
|
|
|
ctx.Info("processing")
|
2016-02-03 05:04:22 +00:00
|
|
|
|
|
|
|
|
for {
|
2016-05-01 01:05:04 +00:00
|
|
|
resp, err := c.svc.GetRecords(
|
|
|
|
|
&kinesis.GetRecordsInput{
|
|
|
|
|
ShardIterator: shardIterator,
|
|
|
|
|
},
|
|
|
|
|
)
|
2016-02-03 05:04:22 +00:00
|
|
|
|
|
|
|
|
if err != nil {
|
2016-05-01 05:23:35 +00:00
|
|
|
ctx.WithError(err).Error("getRecords")
|
2016-05-01 00:04:44 +00:00
|
|
|
os.Exit(1)
|
2016-02-03 05:04:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(resp.Records) > 0 {
|
|
|
|
|
for _, r := range resp.Records {
|
2016-05-01 01:05:04 +00:00
|
|
|
buf.AddRecord(r)
|
2016-02-03 05:04:22 +00:00
|
|
|
|
2016-05-01 01:05:04 +00:00
|
|
|
if buf.ShouldFlush() {
|
|
|
|
|
handler.HandleRecords(*buf)
|
2016-05-01 17:43:42 +00:00
|
|
|
ctx.WithField("count", buf.RecordCount()).Info("emitted")
|
2016-05-01 01:05:04 +00:00
|
|
|
checkpoint.SetCheckpoint(shardID, buf.LastSeq())
|
|
|
|
|
buf.Flush()
|
2016-02-08 21:21:54 +00:00
|
|
|
}
|
2016-02-03 05:04:22 +00:00
|
|
|
}
|
|
|
|
|
} else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator {
|
2016-05-01 05:23:35 +00:00
|
|
|
ctx.Error("nextShardIterator")
|
2016-02-03 05:04:22 +00:00
|
|
|
os.Exit(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
shardIterator = resp.NextShardIterator
|
|
|
|
|
}
|
|
|
|
|
}
|