From 221743b3e21e5370106771ed805241bab72f0334 Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Thu, 2 Nov 2017 21:49:13 +0000 Subject: [PATCH 1/3] write errors to stderr --- batchconsumer/batchermanager.go | 19 ++++++++++-------- batchconsumer/consumer.go | 34 +++++++++++++++++++-------------- batchconsumer/writer.go | 18 +++++++++-------- batchconsumer/writer_test.go | 10 +++++----- cmd/batchconsumer/main.go | 28 ++++++--------------------- 5 files changed, 52 insertions(+), 57 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index b46d37a..40f306a 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -23,9 +23,10 @@ type batcherManagerConfig struct { } type batcherManager struct { - log kv.KayveeLogger - sender Sender - chkpntManager *checkpointManager + log kv.KayveeLogger + failedLogsFile kv.KayveeLogger + sender Sender + chkpntManager *checkpointManager batchCount int batchSize int @@ -38,12 +39,14 @@ type batcherManager struct { } func newBatcherManager( - sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger, + sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, + log kv.KayveeLogger, failedLogsFile kv.KayveeLogger, ) *batcherManager { bm := &batcherManager{ - log: log, - sender: sender, - chkpntManager: chkpntManager, + log: log, + failedLogsFile: failedLogsFile, + sender: sender, + chkpntManager: chkpntManager, batchCount: cfg.BatchCount, batchSize: cfg.BatchSize, @@ -98,7 +101,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) { case PartialSendBatchError: b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) for _, line := range e.FailedMessages { - b.log.ErrorD("failed-log", kv.M{"log": line}) + b.failedLogsFile.ErrorD("failed-log", kv.M{"log": line}) } stats.Counter("batch-log-failures", len(e.FailedMessages)) case CatastrophicSendBatchError: diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index b48a1dd..2738b5c 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -13,8 +13,11 @@ import ( // Config used for BatchConsumer constructor. Any empty fields are populated with defaults. type Config struct { - // LogFile where consumer errors and failed log lines are saved - LogFile string + // Logger for logging info / error logs. + Logger logger.KayveeLogger + + // FailedLogsFile is where logs that failed to process are written. + FailedLogsFile string // BatchInterval the upper bound on how often SendBatch is called with accumulated messages BatchInterval time.Duration @@ -34,13 +37,17 @@ type Config struct { // BatchConsumer is responsible for marshalling type BatchConsumer struct { - kclProcess *kcl.KCLProcess - logfile *os.File + kclProcess *kcl.KCLProcess + failedLogsFile *os.File } func withDefaults(config Config) Config { - if config.LogFile == "" { - config.LogFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) + if config.Logger == nil { + config.Logger = logger.New("amazon-kinesis-client-go") + } + + if config.FailedLogsFile == "" { + config.FailedLogsFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) } if config.BatchInterval == 0 { @@ -74,20 +81,19 @@ func NewBatchConsumerFromFiles( ) *BatchConsumer { config = withDefaults(config) - file, err := os.OpenFile(config.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + file, err := os.OpenFile(config.FailedLogsFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { log.Fatalf("Unable to create log file: %s", err.Error()) } + failedLogsFile := logger.New("amazon-kinesis-client-go") + failedLogsFile.SetOutput(file) - kvlog := logger.New("amazon-kinesis-client-go") - kvlog.SetOutput(file) - - wrt := NewBatchedWriter(config, sender, kvlog) + wrt := NewBatchedWriter(config, sender, config.Logger, failedLogsFile) kclProcess := kcl.New(input, output, errFile, wrt) return &BatchConsumer{ - kclProcess: kclProcess, - logfile: file, + kclProcess: kclProcess, + failedLogsFile: file, } } @@ -100,5 +106,5 @@ func NewBatchConsumer(config Config, sender Sender) *BatchConsumer { // Start when called, the consumer begins ingesting messages. This function blocks. func (b *BatchConsumer) Start() { b.kclProcess.Run() - b.logfile.Close() + b.failedLogsFile.Close() } diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 4b68c74..737bc6c 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -15,9 +15,10 @@ import ( ) type batchedWriter struct { - config Config - sender Sender - log kv.KayveeLogger + config Config + sender Sender + log kv.KayveeLogger + failedLogsFile kv.KayveeLogger shardID string @@ -30,11 +31,12 @@ type batchedWriter struct { lastProcessedSeq kcl.SequencePair } -func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter { +func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger, failedLogsFile kv.KayveeLogger) *batchedWriter { return &batchedWriter{ - config: config, - sender: sender, - log: log, + config: config, + sender: sender, + log: log, + failedLogsFile: failedLogsFile, rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit), } @@ -50,7 +52,7 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer } b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) - b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log, b.failedLogsFile) return nil } diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 94323f5..2555c86 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -142,7 +142,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) { }) mockcheckpointer := NewMockCheckpointer(5 * time.Second) - wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog) + wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -173,7 +173,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -220,7 +220,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -278,7 +278,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -355,7 +355,7 @@ func TestStaggeredCheckpionting(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index f12d3c1..67f2c47 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -2,8 +2,6 @@ package main import ( "fmt" - "log" - "os" "time" "gopkg.in/Clever/kayvee-go.v6/logger" @@ -11,30 +9,16 @@ import ( kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer" ) -func createDummyOutput() (logger.KayveeLogger, *os.File) { - file, err := os.OpenFile("/tmp/example-kcl-output", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - log.Fatalf("Unable to create log file: %s", err.Error()) - } - - kvlog := logger.New("amazon-kinesis-client-go") - kvlog.SetOutput(file) - - return kvlog, file -} - func main() { config := kbc.Config{ - BatchInterval: 10 * time.Second, - BatchCount: 500, - BatchSize: 4 * 1024 * 1024, // 4Mb - LogFile: "/tmp/example-kcl-consumer", + BatchInterval: 10 * time.Second, + BatchCount: 500, + BatchSize: 4 * 1024 * 1024, // 4Mb + Logger: logger.New("amazon-kinesis-client-go"), + FailedLogsFile: "/tmp/example-kcl-consumer", } - output, file := createDummyOutput() - defer file.Close() - - sender := &exampleSender{output: output} + sender := &exampleSender{output: logger.New("fake-output")} consumer := kbc.NewBatchConsumer(config, sender) consumer.Start() } From 945ed317c27c9396989aba37ff4378fe1032e28f Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Fri, 3 Nov 2017 17:48:50 +0000 Subject: [PATCH 2/3] remove logger as parameter --- batchconsumer/batchermanager.go | 15 +++++++-------- batchconsumer/checkpointmanager.go | 10 +--------- batchconsumer/consumer.go | 11 ++--------- batchconsumer/writer.go | 18 ++++++++---------- batchconsumer/writer_test.go | 20 ++++++++++---------- cmd/batchconsumer/main.go | 1 - golang.mk | 20 ++++++++++++++------ 7 files changed, 42 insertions(+), 53 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 40f306a..a335b2f 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -10,6 +10,8 @@ import ( "github.com/Clever/amazon-kinesis-client-go/kcl" ) +var lg = kv.New("amazon-kinesis-client-go") + type tagMsgPair struct { tag string msg []byte @@ -23,7 +25,6 @@ type batcherManagerConfig struct { } type batcherManager struct { - log kv.KayveeLogger failedLogsFile kv.KayveeLogger sender Sender chkpntManager *checkpointManager @@ -39,11 +40,9 @@ type batcherManager struct { } func newBatcherManager( - sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, - log kv.KayveeLogger, failedLogsFile kv.KayveeLogger, + sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, failedLogsFile kv.KayveeLogger, ) *batcherManager { bm := &batcherManager{ - log: log, failedLogsFile: failedLogsFile, sender: sender, chkpntManager: chkpntManager, @@ -99,16 +98,16 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) { switch e := err.(type) { case nil: // Do nothing case PartialSendBatchError: - b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) + lg.ErrorD("send-batch", kv.M{"msg": e.Error()}) for _, line := range e.FailedMessages { b.failedLogsFile.ErrorD("failed-log", kv.M{"log": line}) } stats.Counter("batch-log-failures", len(e.FailedMessages)) case CatastrophicSendBatchError: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + lg.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) default: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + lg.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) } @@ -187,7 +186,7 @@ func (b *batcherManager) startMessageHandler( batcher.AddMessage(tmp.msg, tmp.pair) } else if err != nil { - b.log.ErrorD("add-message", kv.M{ + lg.ErrorD("add-message", kv.M{ "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, }) } diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index 6f312e0..56aac07 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -3,15 +3,11 @@ package batchconsumer import ( "time" - kv "gopkg.in/Clever/kayvee-go.v6/logger" - "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" ) type checkpointManager struct { - log kv.KayveeLogger - checkpointFreq time.Duration checkpoint chan kcl.SequencePair @@ -19,12 +15,8 @@ type checkpointManager struct { shutdown chan chan<- struct{} } -func newCheckpointManager( - checkpointer kcl.Checkpointer, checkpointFreq time.Duration, log kv.KayveeLogger, -) *checkpointManager { +func newCheckpointManager(checkpointer kcl.Checkpointer, checkpointFreq time.Duration) *checkpointManager { cm := &checkpointManager{ - log: log, - checkpointFreq: checkpointFreq, checkpoint: make(chan kcl.SequencePair), diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 2738b5c..da2abed 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -13,9 +13,6 @@ import ( // Config used for BatchConsumer constructor. Any empty fields are populated with defaults. type Config struct { - // Logger for logging info / error logs. - Logger logger.KayveeLogger - // FailedLogsFile is where logs that failed to process are written. FailedLogsFile string @@ -42,10 +39,6 @@ type BatchConsumer struct { } func withDefaults(config Config) Config { - if config.Logger == nil { - config.Logger = logger.New("amazon-kinesis-client-go") - } - if config.FailedLogsFile == "" { config.FailedLogsFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) } @@ -85,10 +78,10 @@ func NewBatchConsumerFromFiles( if err != nil { log.Fatalf("Unable to create log file: %s", err.Error()) } - failedLogsFile := logger.New("amazon-kinesis-client-go") + failedLogsFile := logger.New("amazon-kinesis-client-go/batchconsumer") failedLogsFile.SetOutput(file) - wrt := NewBatchedWriter(config, sender, config.Logger, failedLogsFile) + wrt := NewBatchedWriter(config, sender, failedLogsFile) kclProcess := kcl.New(input, output, errFile, wrt) return &BatchConsumer{ diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 737bc6c..d750497 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -17,7 +17,6 @@ import ( type batchedWriter struct { config Config sender Sender - log kv.KayveeLogger failedLogsFile kv.KayveeLogger shardID string @@ -31,11 +30,10 @@ type batchedWriter struct { lastProcessedSeq kcl.SequencePair } -func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger, failedLogsFile kv.KayveeLogger) *batchedWriter { +func NewBatchedWriter(config Config, sender Sender, failedLogsFile kv.KayveeLogger) *batchedWriter { return &batchedWriter{ config: config, sender: sender, - log: log, failedLogsFile: failedLogsFile, rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit), @@ -51,8 +49,8 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer BatchInterval: b.config.BatchInterval, } - b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) - b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log, b.failedLogsFile) + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile) return nil } @@ -105,20 +103,20 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { continue // Skip message } else if err != nil { stats.Counter("unknown-error", 1) - b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) + lg.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) continue // Don't stop processing messages because of one bad message } if len(tags) == 0 { stats.Counter("no-tags", 1) - b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) + lg.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg)) } for _, tag := range tags { if tag == "" { stats.Counter("blank-tag", 1) - b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) + lg.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg)) } @@ -146,9 +144,9 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { func (b *batchedWriter) Shutdown(reason string) error { if reason == "TERMINATE" { - b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) + lg.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) } else { - b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) + lg.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } done := b.batcherManager.Shutdown() diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 2555c86..7bc7e33 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -135,14 +135,14 @@ func encode(str string) string { func TestProcessRecordsIgnoredMessages(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchInterval: 10 * time.Millisecond, CheckpointFreq: 20 * time.Millisecond, }) mockcheckpointer := NewMockCheckpointer(5 * time.Second) - wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -165,7 +165,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) { func TestProcessRecordsSingleBatchBasic(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchCount: 2, CheckpointFreq: 1, // Don't throttle checks points @@ -173,7 +173,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -212,7 +212,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { func TestProcessRecordsMutliBatchBasic(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, @@ -220,7 +220,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -270,7 +270,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, @@ -278,7 +278,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -346,7 +346,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { func TestStaggeredCheckpionting(t *testing.T) { assert := assert.New(t) - mocklog := logger.New("testing") + mockFailedLogsFile := logger.New("testing") mockconfig := withDefaults(Config{ BatchCount: 2, BatchInterval: 100 * time.Millisecond, @@ -355,7 +355,7 @@ func TestStaggeredCheckpionting(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index 67f2c47..004ec36 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -14,7 +14,6 @@ func main() { BatchInterval: 10 * time.Second, BatchCount: 500, BatchSize: 4 * 1024 * 1024, // 4Mb - Logger: logger.New("amazon-kinesis-client-go"), FailedLogsFile: "/tmp/example-kcl-consumer", } diff --git a/golang.mk b/golang.mk index 602c777..fb402e5 100644 --- a/golang.mk +++ b/golang.mk @@ -1,7 +1,7 @@ # This is the default Clever Golang Makefile. # It is stored in the dev-handbook repo, github.com/Clever/dev-handbook # Please do not alter this file directly. -GOLANG_MK_VERSION := 0.1.4 +GOLANG_MK_VERSION := 0.1.5 SHELL := /bin/bash .PHONY: golang-godep-vendor golang-test-deps $(GODEP) @@ -10,13 +10,21 @@ SHELL := /bin/bash GOPATH=$(shell echo $$GOPATH | cut -d: -f1) # This block checks and confirms that the proper Go toolchain version is installed. +# It uses ^ matching in the semver sense -- you can be ahead by a minor +# version, but not a major version (patch is ignored). # arg1: golang version define golang-version-check -GOVERSION := $(shell go version | grep $(1)) -_ := $(if \ - $(shell go version | grep $(1)), \ - @echo "", \ - $(error "must be running Go version $(1)")) +_ := $(if \ + $(shell \ + expr >/dev/null \ + `go version | cut -d" " -f3 | cut -c3- | cut -d. -f2` \ + \>= `echo $(1) | cut -d. -f2` \ + \& \ + `go version | cut -d" " -f3 | cut -c3- | cut -d. -f1` \ + = `echo $(1) | cut -d. -f1` \ + && echo 1), \ + @echo "", \ + $(error must be running Go version ^$(1) - you are running $(shell go version | cut -d" " -f3 | cut -c3-))) endef export GO15VENDOREXPERIMENT=1 From 7b18d1bab46f59a9b29e4f9dd588b9f10bf9ef70 Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Fri, 3 Nov 2017 18:03:39 +0000 Subject: [PATCH 3/3] write failed log message to file --- batchconsumer/batchermanager.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index a335b2f..2f74288 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -187,7 +187,13 @@ func (b *batcherManager) startMessageHandler( batcher.AddMessage(tmp.msg, tmp.pair) } else if err != nil { lg.ErrorD("add-message", kv.M{ - "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, + "err": err.Error(), + "tag": tmp.tag, + }) + b.failedLogsFile.ErrorD("add-message", kv.M{ + "err": err.Error(), + "msg": string(tmp.msg), + "tag": tmp.tag, }) } stats.Counter("msg-batched", 1)