amazon-kinesis-client-go/batchconsumer/writer.go
Taylor Sutton f8e9c34641 Refactor zlib decompression into the splitter package
Being in the batchconsumer package means it will work for anything
using KCL, but lambdas that subscribe to these log streams do not use
batchconsumer at all; instead they invoke the splitter package
directly. As such, if we want this functionality to be available to
lambda log consumers, it can't be in batchconsumer.

There are no functionality changes here, just moving code from an
unexported method in one place to an exported function in another
place. The tests also get moved along with it.
2020-11-12 11:37:13 -05:00

145 lines
3.9 KiB
Go

package batchconsumer
import (
"context"
"encoding/base64"
"fmt"
"math/big"
"golang.org/x/time/rate"
kv "gopkg.in/Clever/kayvee-go.v6/logger"
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
"github.com/Clever/amazon-kinesis-client-go/kcl"
"github.com/Clever/amazon-kinesis-client-go/splitter"
)
type batchedWriter struct {
config Config
sender Sender
failedLogsFile kv.KayveeLogger
shardID string
chkpntManager *checkpointManager
batcherManager *batcherManager
// Limits the number of records read from the stream
rateLimiter *rate.Limiter
lastProcessedSeq kcl.SequencePair
}
func NewBatchedWriter(config Config, sender Sender, failedLogsFile kv.KayveeLogger) *batchedWriter {
return &batchedWriter{
config: config,
sender: sender,
failedLogsFile: failedLogsFile,
rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit),
}
}
func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
b.shardID = shardID
bmConfig := batcherManagerConfig{
BatchCount: b.config.BatchCount,
BatchSize: b.config.BatchSize,
BatchInterval: b.config.BatchInterval,
}
b.sender.Initialize(shardID)
b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq)
b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile)
return nil
}
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
var pair kcl.SequencePair
prevPair := b.lastProcessedSeq
for _, record := range records {
// Wait until rate limiter permits one more record to be processed
b.rateLimiter.Wait(context.Background())
seq := new(big.Int)
if _, ok := seq.SetString(record.SequenceNumber, 10); !ok { // Validating sequence
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
}
pair = kcl.SequencePair{Sequence: seq, SubSequence: record.SubSequenceNumber}
if prevPair.IsNil() { // Handles on-start edge case where b.lastProcessSeq is empty
prevPair = pair
}
data, err := base64.StdEncoding.DecodeString(record.Data)
if err != nil {
return err
}
messages, err := splitter.SplitMessageIfNecessary(data)
if err != nil {
return err
}
wasPairIgnored := true
for _, rawmsg := range messages {
msg, tags, err := b.sender.ProcessMessage(rawmsg)
if err == ErrMessageIgnored {
continue // Skip message
} else if err != nil {
stats.Counter("unknown-error", 1)
lg.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)})
continue // Don't stop processing messages because of one bad message
}
if len(tags) == 0 {
stats.Counter("no-tags", 1)
lg.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)})
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg))
}
for _, tag := range tags {
if tag == "" {
stats.Counter("blank-tag", 1)
lg.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)})
return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg))
}
// Use second to last sequence number to ensure we don't checkpoint a message before
// it's been sent. When batches are sent, conceptually we first find the smallest
// sequence number amount all the batch (let's call it A). We then checkpoint at
// the A-1 sequence number.
b.batcherManager.BatchMessage(tag, msg, prevPair)
wasPairIgnored = false
}
}
prevPair = pair
if wasPairIgnored {
b.batcherManager.LatestIgnored(pair)
}
b.batcherManager.LatestProcessed(pair)
stats.Counter("processed-messages", len(messages))
}
b.lastProcessedSeq = pair
return nil
}
func (b *batchedWriter) Shutdown(reason string) error {
if reason == "TERMINATE" {
lg.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
} else {
lg.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
}
done := b.batcherManager.Shutdown()
<-done
return nil
}