amazon-kinesis-client-go/batchconsumer/writer.go

144 lines
3.9 KiB
Go
Raw Normal View History

package batchconsumer
import (
"context"
"encoding/base64"
"fmt"
"math/big"
"golang.org/x/time/rate"
kv "gopkg.in/Clever/kayvee-go.v6/logger"
"github.com/Clever/amazon-kinesis-client-go/kcl"
"github.com/Clever/amazon-kinesis-client-go/splitter"
)
2017-07-18 19:19:40 +00:00
type batchedWriter struct {
config Config
sender Sender
log kv.KayveeLogger
shardID string
chkpntManager *checkpointManager
batcherManager *batcherManager
// Limits the number of records read from the stream
rateLimiter *rate.Limiter
2017-08-02 19:45:23 +00:00
lastProcessedSeq kcl.SequencePair
}
func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter {
return &batchedWriter{
config: config,
sender: sender,
log: log,
rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit),
}
}
func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
b.shardID = shardID
b.chkpntManager = NewCheckpointManager(checkpointer, b.config, b.log)
b.batcherManager = NewBatcherManager(b.sender, b.chkpntManager, b.config, b.log)
return nil
}
2017-07-18 19:19:40 +00:00
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
// We handle two types of records:
// - records emitted from CWLogs Subscription
// - records emiited from KPL
if !splitter.IsGzipped(record) {
// Process a single message, from KPL
return [][]byte{record}, nil
}
// Process a batch of messages from a CWLogs Subscription
return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production")
}
2017-07-18 19:19:40 +00:00
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
2017-08-02 19:45:23 +00:00
var pair kcl.SequencePair
prevPair := b.lastProcessedSeq
for _, record := range records {
// Wait until rate limiter permits one more record to be processed
b.rateLimiter.Wait(context.Background())
seq := new(big.Int)
if _, ok := seq.SetString(record.SequenceNumber, 10); !ok { // Validating sequence
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
}
2017-08-02 19:45:23 +00:00
pair = kcl.SequencePair{seq, record.SubSequenceNumber}
if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty
prevPair = pair
}
data, err := base64.StdEncoding.DecodeString(record.Data)
if err != nil {
return err
}
messages, err := b.splitMessageIfNecessary(data)
if err != nil {
return err
}
wasPairIgnored := true
for _, rawmsg := range messages {
msg, tags, err := b.sender.ProcessMessage(rawmsg)
if err == ErrMessageIgnored {
continue // Skip message
} else if err != nil {
b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)})
continue // Don't stop processing messages because of one bad message
}
if len(tags) == 0 {
b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)})
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg))
}
for _, tag := range tags {
if tag == "" {
b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)})
return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg))
}
// Use second to last sequence number to ensure we don't checkpoint a message before
// it's been sent. When batches are sent, conceptually we first find the smallest
// sequence number amount all the batch (let's call it A). We then checkpoint at
// the A-1 sequence number.
b.batcherManager.BatchMessage(tag, msg, prevPair)
wasPairIgnored = false
}
}
prevPair = pair
if wasPairIgnored {
b.batcherManager.LatestIgnored(pair)
}
b.batcherManager.LatestProcessed(pair)
}
b.lastProcessedSeq = pair
return nil
}
2017-07-18 19:19:40 +00:00
func (b *batchedWriter) Shutdown(reason string) error {
if reason == "TERMINATE" {
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
} else {
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
}
b.batcherManager.Shutdown()
return nil
}