diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 40f306a..a335b2f 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -10,6 +10,8 @@ import ( "github.com/Clever/amazon-kinesis-client-go/kcl" ) +var lg = kv.New("amazon-kinesis-client-go") + type tagMsgPair struct { tag string msg []byte @@ -23,7 +25,6 @@ type batcherManagerConfig struct { } type batcherManager struct { - log kv.KayveeLogger failedLogsFile kv.KayveeLogger sender Sender chkpntManager *checkpointManager @@ -39,11 +40,9 @@ type batcherManager struct { } func newBatcherManager( - sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, - log kv.KayveeLogger, failedLogsFile kv.KayveeLogger, + sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, failedLogsFile kv.KayveeLogger, ) *batcherManager { bm := &batcherManager{ - log: log, failedLogsFile: failedLogsFile, sender: sender, chkpntManager: chkpntManager, @@ -99,16 +98,16 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) { switch e := err.(type) { case nil: // Do nothing case PartialSendBatchError: - b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) + lg.ErrorD("send-batch", kv.M{"msg": e.Error()}) for _, line := range e.FailedMessages { b.failedLogsFile.ErrorD("failed-log", kv.M{"log": line}) } stats.Counter("batch-log-failures", len(e.FailedMessages)) case CatastrophicSendBatchError: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + lg.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) default: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + lg.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) } @@ -187,7 +186,7 @@ func (b *batcherManager) startMessageHandler( batcher.AddMessage(tmp.msg, tmp.pair) } else if err != nil { - b.log.ErrorD("add-message", kv.M{ + lg.ErrorD("add-message", kv.M{ "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, }) } diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index 6f312e0..56aac07 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -3,15 +3,11 @@ package batchconsumer import ( "time" - kv "gopkg.in/Clever/kayvee-go.v6/logger" - "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" ) type checkpointManager struct { - log kv.KayveeLogger - checkpointFreq time.Duration checkpoint chan kcl.SequencePair @@ -19,12 +15,8 @@ type checkpointManager struct { shutdown chan chan<- struct{} } -func newCheckpointManager( - checkpointer kcl.Checkpointer, checkpointFreq time.Duration, log kv.KayveeLogger, -) *checkpointManager { +func newCheckpointManager(checkpointer kcl.Checkpointer, checkpointFreq time.Duration) *checkpointManager { cm := &checkpointManager{ - log: log, - checkpointFreq: checkpointFreq, checkpoint: make(chan kcl.SequencePair), diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 2738b5c..da2abed 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -13,9 +13,6 @@ import ( // Config used for BatchConsumer constructor. Any empty fields are populated with defaults. type Config struct { - // Logger for logging info / error logs. - Logger logger.KayveeLogger - // FailedLogsFile is where logs that failed to process are written. FailedLogsFile string @@ -42,10 +39,6 @@ type BatchConsumer struct { } func withDefaults(config Config) Config { - if config.Logger == nil { - config.Logger = logger.New("amazon-kinesis-client-go") - } - if config.FailedLogsFile == "" { config.FailedLogsFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) } @@ -85,10 +78,10 @@ func NewBatchConsumerFromFiles( if err != nil { log.Fatalf("Unable to create log file: %s", err.Error()) } - failedLogsFile := logger.New("amazon-kinesis-client-go") + failedLogsFile := logger.New("amazon-kinesis-client-go/batchconsumer") failedLogsFile.SetOutput(file) - wrt := NewBatchedWriter(config, sender, config.Logger, failedLogsFile) + wrt := NewBatchedWriter(config, sender, failedLogsFile) kclProcess := kcl.New(input, output, errFile, wrt) return &BatchConsumer{ diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 737bc6c..d750497 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -17,7 +17,6 @@ import ( type batchedWriter struct { config Config sender Sender - log kv.KayveeLogger failedLogsFile kv.KayveeLogger shardID string @@ -31,11 +30,10 @@ type batchedWriter struct { lastProcessedSeq kcl.SequencePair } -func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger, failedLogsFile kv.KayveeLogger) *batchedWriter { +func NewBatchedWriter(config Config, sender Sender, failedLogsFile kv.KayveeLogger) *batchedWriter { return &batchedWriter{ config: config, sender: sender, - log: log, failedLogsFile: failedLogsFile, rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit), @@ -51,8 +49,8 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer BatchInterval: b.config.BatchInterval, } - b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) - b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log, b.failedLogsFile) + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile) return nil } @@ -105,20 +103,20 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { continue // Skip message } else if err != nil { stats.Counter("unknown-error", 1) - b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) + lg.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) continue // Don't stop processing messages because of one bad message } if len(tags) == 0 { stats.Counter("no-tags", 1) - b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) + lg.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg)) } for _, tag := range tags { if tag == "" { stats.Counter("blank-tag", 1) - b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) + lg.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg)) } @@ -146,9 +144,9 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { func (b *batchedWriter) Shutdown(reason string) error { if reason == "TERMINATE" { - b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) + lg.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) } else { - b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) + lg.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } done := b.batcherManager.Shutdown() diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 2555c86..7bc7e33 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -135,14 +135,14 @@ func encode(str string) string { func TestProcessRecordsIgnoredMessages(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchInterval: 10 * time.Millisecond, CheckpointFreq: 20 * time.Millisecond, }) mockcheckpointer := NewMockCheckpointer(5 * time.Second) - wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -165,7 +165,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) { func TestProcessRecordsSingleBatchBasic(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchCount: 2, CheckpointFreq: 1, // Don't throttle checks points @@ -173,7 +173,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -212,7 +212,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { func TestProcessRecordsMutliBatchBasic(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, @@ -220,7 +220,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -270,7 +270,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, @@ -278,7 +278,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -346,7 +346,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { func TestStaggeredCheckpionting(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchCount: 2, BatchInterval: 100 * time.Millisecond, @@ -355,7 +355,7 @@ func TestStaggeredCheckpionting(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index 67f2c47..004ec36 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -14,7 +14,6 @@ func main() { BatchInterval: 10 * time.Second, BatchCount: 500, BatchSize: 4 * 1024 * 1024, // 4Mb - Logger: logger.New("amazon-kinesis-client-go"), FailedLogsFile: "/tmp/example-kcl-consumer", } diff --git a/golang.mk b/golang.mk index 602c777..fb402e5 100644 --- a/golang.mk +++ b/golang.mk @@ -1,7 +1,7 @@ # This is the default Clever Golang Makefile. # It is stored in the dev-handbook repo, github.com/Clever/dev-handbook # Please do not alter this file directly. -GOLANG_MK_VERSION := 0.1.4 +GOLANG_MK_VERSION := 0.1.5 SHELL := /bin/bash .PHONY: golang-godep-vendor golang-test-deps $(GODEP) @@ -10,13 +10,21 @@ SHELL := /bin/bash GOPATH=$(shell echo $$GOPATH | cut -d: -f1) # This block checks and confirms that the proper Go toolchain version is installed. +# It uses ^ matching in the semver sense -- you can be ahead by a minor +# version, but not a major version (patch is ignored). # arg1: golang version define golang-version-check -GOVERSION := $(shell go version | grep $(1)) -_ := $(if \ - $(shell go version | grep $(1)), \ - @echo "", \ - $(error "must be running Go version $(1)")) +_ := $(if \ + $(shell \ + expr >/dev/null \ + `go version | cut -d" " -f3 | cut -c3- | cut -d. -f2` \ + \>= `echo $(1) | cut -d. -f2` \ + \& \ + `go version | cut -d" " -f3 | cut -c3- | cut -d. -f1` \ + = `echo $(1) | cut -d. -f1` \ + && echo 1), \ + @echo "", \ + $(error must be running Go version ^$(1) - you are running $(shell go version | cut -d" " -f3 | cut -c3-))) endef export GO15VENDOREXPERIMENT=1