diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index b46d37a..2f74288 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -10,6 +10,8 @@ import ( "github.com/Clever/amazon-kinesis-client-go/kcl" ) +var lg = kv.New("amazon-kinesis-client-go") + type tagMsgPair struct { tag string msg []byte @@ -23,9 +25,9 @@ type batcherManagerConfig struct { } type batcherManager struct { - log kv.KayveeLogger - sender Sender - chkpntManager *checkpointManager + failedLogsFile kv.KayveeLogger + sender Sender + chkpntManager *checkpointManager batchCount int batchSize int @@ -38,12 +40,12 @@ type batcherManager struct { } func newBatcherManager( - sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger, + sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, failedLogsFile kv.KayveeLogger, ) *batcherManager { bm := &batcherManager{ - log: log, - sender: sender, - chkpntManager: chkpntManager, + failedLogsFile: failedLogsFile, + sender: sender, + chkpntManager: chkpntManager, batchCount: cfg.BatchCount, batchSize: cfg.BatchSize, @@ -96,16 +98,16 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) { switch e := err.(type) { case nil: // Do nothing case PartialSendBatchError: - b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) + lg.ErrorD("send-batch", kv.M{"msg": e.Error()}) for _, line := range e.FailedMessages { - b.log.ErrorD("failed-log", kv.M{"log": line}) + b.failedLogsFile.ErrorD("failed-log", kv.M{"log": line}) } stats.Counter("batch-log-failures", len(e.FailedMessages)) case CatastrophicSendBatchError: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + lg.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) default: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + lg.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) } @@ -184,8 +186,14 @@ func (b *batcherManager) startMessageHandler( batcher.AddMessage(tmp.msg, tmp.pair) } else if err != nil { - b.log.ErrorD("add-message", kv.M{ - "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, + lg.ErrorD("add-message", kv.M{ + "err": err.Error(), + "tag": tmp.tag, + }) + b.failedLogsFile.ErrorD("add-message", kv.M{ + "err": err.Error(), + "msg": string(tmp.msg), + "tag": tmp.tag, }) } stats.Counter("msg-batched", 1) diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index 6f312e0..56aac07 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -3,15 +3,11 @@ package batchconsumer import ( "time" - kv "gopkg.in/Clever/kayvee-go.v6/logger" - "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" ) type checkpointManager struct { - log kv.KayveeLogger - checkpointFreq time.Duration checkpoint chan kcl.SequencePair @@ -19,12 +15,8 @@ type checkpointManager struct { shutdown chan chan<- struct{} } -func newCheckpointManager( - checkpointer kcl.Checkpointer, checkpointFreq time.Duration, log kv.KayveeLogger, -) *checkpointManager { +func newCheckpointManager(checkpointer kcl.Checkpointer, checkpointFreq time.Duration) *checkpointManager { cm := &checkpointManager{ - log: log, - checkpointFreq: checkpointFreq, checkpoint: make(chan kcl.SequencePair), diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index b48a1dd..da2abed 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -13,8 +13,8 @@ import ( // Config used for BatchConsumer constructor. Any empty fields are populated with defaults. type Config struct { - // LogFile where consumer errors and failed log lines are saved - LogFile string + // FailedLogsFile is where logs that failed to process are written. + FailedLogsFile string // BatchInterval the upper bound on how often SendBatch is called with accumulated messages BatchInterval time.Duration @@ -34,13 +34,13 @@ type Config struct { // BatchConsumer is responsible for marshalling type BatchConsumer struct { - kclProcess *kcl.KCLProcess - logfile *os.File + kclProcess *kcl.KCLProcess + failedLogsFile *os.File } func withDefaults(config Config) Config { - if config.LogFile == "" { - config.LogFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) + if config.FailedLogsFile == "" { + config.FailedLogsFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) } if config.BatchInterval == 0 { @@ -74,20 +74,19 @@ func NewBatchConsumerFromFiles( ) *BatchConsumer { config = withDefaults(config) - file, err := os.OpenFile(config.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + file, err := os.OpenFile(config.FailedLogsFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { log.Fatalf("Unable to create log file: %s", err.Error()) } + failedLogsFile := logger.New("amazon-kinesis-client-go/batchconsumer") + failedLogsFile.SetOutput(file) - kvlog := logger.New("amazon-kinesis-client-go") - kvlog.SetOutput(file) - - wrt := NewBatchedWriter(config, sender, kvlog) + wrt := NewBatchedWriter(config, sender, failedLogsFile) kclProcess := kcl.New(input, output, errFile, wrt) return &BatchConsumer{ - kclProcess: kclProcess, - logfile: file, + kclProcess: kclProcess, + failedLogsFile: file, } } @@ -100,5 +99,5 @@ func NewBatchConsumer(config Config, sender Sender) *BatchConsumer { // Start when called, the consumer begins ingesting messages. This function blocks. func (b *BatchConsumer) Start() { b.kclProcess.Run() - b.logfile.Close() + b.failedLogsFile.Close() } diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 4b68c74..d750497 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -15,9 +15,9 @@ import ( ) type batchedWriter struct { - config Config - sender Sender - log kv.KayveeLogger + config Config + sender Sender + failedLogsFile kv.KayveeLogger shardID string @@ -30,11 +30,11 @@ type batchedWriter struct { lastProcessedSeq kcl.SequencePair } -func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter { +func NewBatchedWriter(config Config, sender Sender, failedLogsFile kv.KayveeLogger) *batchedWriter { return &batchedWriter{ - config: config, - sender: sender, - log: log, + config: config, + sender: sender, + failedLogsFile: failedLogsFile, rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit), } @@ -49,8 +49,8 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer BatchInterval: b.config.BatchInterval, } - b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) - b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log) + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile) return nil } @@ -103,20 +103,20 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { continue // Skip message } else if err != nil { stats.Counter("unknown-error", 1) - b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) + lg.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) continue // Don't stop processing messages because of one bad message } if len(tags) == 0 { stats.Counter("no-tags", 1) - b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) + lg.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg)) } for _, tag := range tags { if tag == "" { stats.Counter("blank-tag", 1) - b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) + lg.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg)) } @@ -144,9 +144,9 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { func (b *batchedWriter) Shutdown(reason string) error { if reason == "TERMINATE" { - b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) + lg.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) } else { - b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) + lg.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } done := b.batcherManager.Shutdown() diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 94323f5..7bc7e33 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -135,14 +135,14 @@ func encode(str string) string { func TestProcessRecordsIgnoredMessages(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchInterval: 10 * time.Millisecond, CheckpointFreq: 20 * time.Millisecond, }) mockcheckpointer := NewMockCheckpointer(5 * time.Second) - wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog) + wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -165,7 +165,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) { func TestProcessRecordsSingleBatchBasic(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchCount: 2, CheckpointFreq: 1, // Don't throttle checks points @@ -173,7 +173,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -212,7 +212,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { func TestProcessRecordsMutliBatchBasic(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, @@ -220,7 +220,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -270,7 +270,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, @@ -278,7 +278,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -346,7 +346,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { func TestStaggeredCheckpionting(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchCount: 2, BatchInterval: 100 * time.Millisecond, @@ -355,7 +355,7 @@ func TestStaggeredCheckpionting(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index f12d3c1..004ec36 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -2,8 +2,6 @@ package main import ( "fmt" - "log" - "os" "time" "gopkg.in/Clever/kayvee-go.v6/logger" @@ -11,30 +9,15 @@ import ( kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer" ) -func createDummyOutput() (logger.KayveeLogger, *os.File) { - file, err := os.OpenFile("/tmp/example-kcl-output", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - log.Fatalf("Unable to create log file: %s", err.Error()) - } - - kvlog := logger.New("amazon-kinesis-client-go") - kvlog.SetOutput(file) - - return kvlog, file -} - func main() { config := kbc.Config{ - BatchInterval: 10 * time.Second, - BatchCount: 500, - BatchSize: 4 * 1024 * 1024, // 4Mb - LogFile: "/tmp/example-kcl-consumer", + BatchInterval: 10 * time.Second, + BatchCount: 500, + BatchSize: 4 * 1024 * 1024, // 4Mb + FailedLogsFile: "/tmp/example-kcl-consumer", } - output, file := createDummyOutput() - defer file.Close() - - sender := &exampleSender{output: output} + sender := &exampleSender{output: logger.New("fake-output")} consumer := kbc.NewBatchConsumer(config, sender) consumer.Start() } diff --git a/golang.mk b/golang.mk index 602c777..fb402e5 100644 --- a/golang.mk +++ b/golang.mk @@ -1,7 +1,7 @@ # This is the default Clever Golang Makefile. # It is stored in the dev-handbook repo, github.com/Clever/dev-handbook # Please do not alter this file directly. -GOLANG_MK_VERSION := 0.1.4 +GOLANG_MK_VERSION := 0.1.5 SHELL := /bin/bash .PHONY: golang-godep-vendor golang-test-deps $(GODEP) @@ -10,13 +10,21 @@ SHELL := /bin/bash GOPATH=$(shell echo $$GOPATH | cut -d: -f1) # This block checks and confirms that the proper Go toolchain version is installed. +# It uses ^ matching in the semver sense -- you can be ahead by a minor +# version, but not a major version (patch is ignored). # arg1: golang version define golang-version-check -GOVERSION := $(shell go version | grep $(1)) -_ := $(if \ - $(shell go version | grep $(1)), \ - @echo "", \ - $(error "must be running Go version $(1)")) +_ := $(if \ + $(shell \ + expr >/dev/null \ + `go version | cut -d" " -f3 | cut -c3- | cut -d. -f2` \ + \>= `echo $(1) | cut -d. -f2` \ + \& \ + `go version | cut -d" " -f3 | cut -c3- | cut -d. -f1` \ + = `echo $(1) | cut -d. -f1` \ + && echo 1), \ + @echo "", \ + $(error must be running Go version ^$(1) - you are running $(shell go version | cut -d" " -f3 | cut -c3-))) endef export GO15VENDOREXPERIMENT=1