2017-07-18 02:03:15 +00:00
|
|
|
package batchconsumer
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/base64"
|
|
|
|
|
"fmt"
|
|
|
|
|
"math/big"
|
|
|
|
|
|
|
|
|
|
"golang.org/x/time/rate"
|
|
|
|
|
kv "gopkg.in/Clever/kayvee-go.v6/logger"
|
|
|
|
|
|
2017-08-07 03:05:41 +00:00
|
|
|
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
|
2017-07-18 02:03:15 +00:00
|
|
|
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
|
|
|
|
"github.com/Clever/amazon-kinesis-client-go/splitter"
|
|
|
|
|
)
|
|
|
|
|
|
2017-07-18 19:19:40 +00:00
|
|
|
type batchedWriter struct {
|
2017-07-18 02:03:15 +00:00
|
|
|
config Config
|
|
|
|
|
sender Sender
|
|
|
|
|
log kv.KayveeLogger
|
|
|
|
|
|
2017-07-21 01:35:54 +00:00
|
|
|
shardID string
|
|
|
|
|
|
2017-08-04 09:36:42 +00:00
|
|
|
chkpntManager *checkpointManager
|
|
|
|
|
batcherManager *batcherManager
|
2017-07-18 02:03:15 +00:00
|
|
|
|
|
|
|
|
// Limits the number of records read from the stream
|
|
|
|
|
rateLimiter *rate.Limiter
|
|
|
|
|
|
2017-08-02 19:45:23 +00:00
|
|
|
lastProcessedSeq kcl.SequencePair
|
2017-07-18 02:03:15 +00:00
|
|
|
}
|
|
|
|
|
|
2017-07-21 01:35:54 +00:00
|
|
|
func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter {
|
|
|
|
|
return &batchedWriter{
|
|
|
|
|
config: config,
|
|
|
|
|
sender: sender,
|
|
|
|
|
log: log,
|
|
|
|
|
|
|
|
|
|
rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
|
2017-07-18 02:03:15 +00:00
|
|
|
b.shardID = shardID
|
|
|
|
|
|
2017-08-10 20:11:24 +00:00
|
|
|
bmConfig := batcherManagerConfig{
|
|
|
|
|
BatchCount: b.config.BatchCount,
|
|
|
|
|
BatchSize: b.config.BatchSize,
|
|
|
|
|
BatchInterval: b.config.BatchInterval,
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-10 20:01:07 +00:00
|
|
|
b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log)
|
2017-08-10 20:11:24 +00:00
|
|
|
b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log)
|
2017-07-18 02:03:15 +00:00
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-07-18 19:19:40 +00:00
|
|
|
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
|
2017-07-18 02:03:15 +00:00
|
|
|
// We handle two types of records:
|
|
|
|
|
// - records emitted from CWLogs Subscription
|
|
|
|
|
// - records emiited from KPL
|
|
|
|
|
if !splitter.IsGzipped(record) {
|
|
|
|
|
// Process a single message, from KPL
|
|
|
|
|
return [][]byte{record}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Process a batch of messages from a CWLogs Subscription
|
|
|
|
|
return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production")
|
|
|
|
|
}
|
|
|
|
|
|
2017-07-18 19:19:40 +00:00
|
|
|
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
2017-08-02 19:45:23 +00:00
|
|
|
var pair kcl.SequencePair
|
2017-07-21 01:35:54 +00:00
|
|
|
prevPair := b.lastProcessedSeq
|
2017-07-18 02:03:15 +00:00
|
|
|
|
|
|
|
|
for _, record := range records {
|
|
|
|
|
// Wait until rate limiter permits one more record to be processed
|
|
|
|
|
b.rateLimiter.Wait(context.Background())
|
|
|
|
|
|
|
|
|
|
seq := new(big.Int)
|
|
|
|
|
if _, ok := seq.SetString(record.SequenceNumber, 10); !ok { // Validating sequence
|
|
|
|
|
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-02 19:45:23 +00:00
|
|
|
pair = kcl.SequencePair{seq, record.SubSequenceNumber}
|
2017-08-10 20:16:41 +00:00
|
|
|
if prevPair.IsNil() { // Handles on-start edge case where b.lastProcessSeq is empty
|
2017-07-21 01:35:54 +00:00
|
|
|
prevPair = pair
|
|
|
|
|
}
|
2017-07-18 02:03:15 +00:00
|
|
|
|
|
|
|
|
data, err := base64.StdEncoding.DecodeString(record.Data)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2017-07-21 01:35:54 +00:00
|
|
|
messages, err := b.splitMessageIfNecessary(data)
|
2017-07-18 02:03:15 +00:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2017-08-03 21:22:52 +00:00
|
|
|
wasPairIgnored := true
|
2017-07-21 01:35:54 +00:00
|
|
|
for _, rawmsg := range messages {
|
|
|
|
|
msg, tags, err := b.sender.ProcessMessage(rawmsg)
|
|
|
|
|
|
2017-07-19 00:21:31 +00:00
|
|
|
if err == ErrMessageIgnored {
|
2017-07-18 02:03:15 +00:00
|
|
|
continue // Skip message
|
|
|
|
|
} else if err != nil {
|
2017-08-07 03:05:41 +00:00
|
|
|
stats.Counter("unknown-error", 1)
|
2017-07-21 01:35:54 +00:00
|
|
|
b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)})
|
|
|
|
|
continue // Don't stop processing messages because of one bad message
|
2017-07-18 02:03:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(tags) == 0 {
|
2017-08-07 03:05:41 +00:00
|
|
|
stats.Counter("no-tags", 1)
|
2017-07-21 01:35:54 +00:00
|
|
|
b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)})
|
|
|
|
|
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg))
|
2017-07-18 02:03:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, tag := range tags {
|
2017-07-21 01:35:54 +00:00
|
|
|
if tag == "" {
|
2017-08-07 03:05:41 +00:00
|
|
|
stats.Counter("blank-tag", 1)
|
2017-07-21 01:35:54 +00:00
|
|
|
b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)})
|
|
|
|
|
return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg))
|
2017-07-18 02:03:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Use second to last sequence number to ensure we don't checkpoint a message before
|
|
|
|
|
// it's been sent. When batches are sent, conceptually we first find the smallest
|
|
|
|
|
// sequence number amount all the batch (let's call it A). We then checkpoint at
|
|
|
|
|
// the A-1 sequence number.
|
2017-08-04 09:36:42 +00:00
|
|
|
b.batcherManager.BatchMessage(tag, msg, prevPair)
|
2017-08-03 21:22:52 +00:00
|
|
|
wasPairIgnored = false
|
2017-07-18 02:03:15 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-07-21 01:35:54 +00:00
|
|
|
prevPair = pair
|
2017-08-03 21:22:52 +00:00
|
|
|
if wasPairIgnored {
|
2017-08-04 09:36:42 +00:00
|
|
|
b.batcherManager.LatestIgnored(pair)
|
2017-08-03 21:22:52 +00:00
|
|
|
}
|
2017-08-04 09:36:42 +00:00
|
|
|
b.batcherManager.LatestProcessed(pair)
|
2017-08-07 03:05:41 +00:00
|
|
|
|
|
|
|
|
stats.Counter("processed-messages", len(messages))
|
2017-07-18 02:03:15 +00:00
|
|
|
}
|
2017-07-21 01:35:54 +00:00
|
|
|
b.lastProcessedSeq = pair
|
2017-07-18 02:03:15 +00:00
|
|
|
|
2017-07-21 01:35:54 +00:00
|
|
|
return nil
|
2017-07-18 02:03:15 +00:00
|
|
|
}
|
|
|
|
|
|
2017-07-18 19:19:40 +00:00
|
|
|
func (b *batchedWriter) Shutdown(reason string) error {
|
2017-07-18 02:03:15 +00:00
|
|
|
if reason == "TERMINATE" {
|
|
|
|
|
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
|
|
|
|
|
} else {
|
|
|
|
|
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
|
|
|
|
|
}
|
2017-08-04 09:36:42 +00:00
|
|
|
|
2017-08-08 19:09:31 +00:00
|
|
|
done := b.batcherManager.Shutdown()
|
|
|
|
|
<-done
|
2017-08-04 09:36:42 +00:00
|
|
|
|
2017-07-18 02:03:15 +00:00
|
|
|
return nil
|
|
|
|
|
}
|