From 221743b3e21e5370106771ed805241bab72f0334 Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Thu, 2 Nov 2017 21:49:13 +0000 Subject: [PATCH] write errors to stderr --- batchconsumer/batchermanager.go | 19 ++++++++++-------- batchconsumer/consumer.go | 34 +++++++++++++++++++-------------- batchconsumer/writer.go | 18 +++++++++-------- batchconsumer/writer_test.go | 10 +++++----- cmd/batchconsumer/main.go | 28 ++++++--------------------- 5 files changed, 52 insertions(+), 57 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index b46d37a..40f306a 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -23,9 +23,10 @@ type batcherManagerConfig struct { } type batcherManager struct { - log kv.KayveeLogger - sender Sender - chkpntManager *checkpointManager + log kv.KayveeLogger + failedLogsFile kv.KayveeLogger + sender Sender + chkpntManager *checkpointManager batchCount int batchSize int @@ -38,12 +39,14 @@ type batcherManager struct { } func newBatcherManager( - sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger, + sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, + log kv.KayveeLogger, failedLogsFile kv.KayveeLogger, ) *batcherManager { bm := &batcherManager{ - log: log, - sender: sender, - chkpntManager: chkpntManager, + log: log, + failedLogsFile: failedLogsFile, + sender: sender, + chkpntManager: chkpntManager, batchCount: cfg.BatchCount, batchSize: cfg.BatchSize, @@ -98,7 +101,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) { case PartialSendBatchError: b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) for _, line := range e.FailedMessages { - b.log.ErrorD("failed-log", kv.M{"log": line}) + b.failedLogsFile.ErrorD("failed-log", kv.M{"log": line}) } stats.Counter("batch-log-failures", len(e.FailedMessages)) case CatastrophicSendBatchError: diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index b48a1dd..2738b5c 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -13,8 +13,11 @@ import ( // Config used for BatchConsumer constructor. Any empty fields are populated with defaults. type Config struct { - // LogFile where consumer errors and failed log lines are saved - LogFile string + // Logger for logging info / error logs. + Logger logger.KayveeLogger + + // FailedLogsFile is where logs that failed to process are written. + FailedLogsFile string // BatchInterval the upper bound on how often SendBatch is called with accumulated messages BatchInterval time.Duration @@ -34,13 +37,17 @@ type Config struct { // BatchConsumer is responsible for marshalling type BatchConsumer struct { - kclProcess *kcl.KCLProcess - logfile *os.File + kclProcess *kcl.KCLProcess + failedLogsFile *os.File } func withDefaults(config Config) Config { - if config.LogFile == "" { - config.LogFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) + if config.Logger == nil { + config.Logger = logger.New("amazon-kinesis-client-go") + } + + if config.FailedLogsFile == "" { + config.FailedLogsFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) } if config.BatchInterval == 0 { @@ -74,20 +81,19 @@ func NewBatchConsumerFromFiles( ) *BatchConsumer { config = withDefaults(config) - file, err := os.OpenFile(config.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + file, err := os.OpenFile(config.FailedLogsFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { log.Fatalf("Unable to create log file: %s", err.Error()) } + failedLogsFile := logger.New("amazon-kinesis-client-go") + failedLogsFile.SetOutput(file) - kvlog := logger.New("amazon-kinesis-client-go") - kvlog.SetOutput(file) - - wrt := NewBatchedWriter(config, sender, kvlog) + wrt := NewBatchedWriter(config, sender, config.Logger, failedLogsFile) kclProcess := kcl.New(input, output, errFile, wrt) return &BatchConsumer{ - kclProcess: kclProcess, - logfile: file, + kclProcess: kclProcess, + failedLogsFile: file, } } @@ -100,5 +106,5 @@ func NewBatchConsumer(config Config, sender Sender) *BatchConsumer { // Start when called, the consumer begins ingesting messages. This function blocks. func (b *BatchConsumer) Start() { b.kclProcess.Run() - b.logfile.Close() + b.failedLogsFile.Close() } diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 4b68c74..737bc6c 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -15,9 +15,10 @@ import ( ) type batchedWriter struct { - config Config - sender Sender - log kv.KayveeLogger + config Config + sender Sender + log kv.KayveeLogger + failedLogsFile kv.KayveeLogger shardID string @@ -30,11 +31,12 @@ type batchedWriter struct { lastProcessedSeq kcl.SequencePair } -func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter { +func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger, failedLogsFile kv.KayveeLogger) *batchedWriter { return &batchedWriter{ - config: config, - sender: sender, - log: log, + config: config, + sender: sender, + log: log, + failedLogsFile: failedLogsFile, rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit), } @@ -50,7 +52,7 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer } b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) - b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log, b.failedLogsFile) return nil } diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 94323f5..2555c86 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -142,7 +142,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) { }) mockcheckpointer := NewMockCheckpointer(5 * time.Second) - wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog) + wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -173,7 +173,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -220,7 +220,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -278,7 +278,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ @@ -355,7 +355,7 @@ func TestStaggeredCheckpionting(t *testing.T) { mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() - wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog) wrt.Initialize("test-shard", mockcheckpointer) err := wrt.ProcessRecords([]kcl.Record{ diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index f12d3c1..67f2c47 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -2,8 +2,6 @@ package main import ( "fmt" - "log" - "os" "time" "gopkg.in/Clever/kayvee-go.v6/logger" @@ -11,30 +9,16 @@ import ( kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer" ) -func createDummyOutput() (logger.KayveeLogger, *os.File) { - file, err := os.OpenFile("/tmp/example-kcl-output", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - log.Fatalf("Unable to create log file: %s", err.Error()) - } - - kvlog := logger.New("amazon-kinesis-client-go") - kvlog.SetOutput(file) - - return kvlog, file -} - func main() { config := kbc.Config{ - BatchInterval: 10 * time.Second, - BatchCount: 500, - BatchSize: 4 * 1024 * 1024, // 4Mb - LogFile: "/tmp/example-kcl-consumer", + BatchInterval: 10 * time.Second, + BatchCount: 500, + BatchSize: 4 * 1024 * 1024, // 4Mb + Logger: logger.New("amazon-kinesis-client-go"), + FailedLogsFile: "/tmp/example-kcl-consumer", } - output, file := createDummyOutput() - defer file.Close() - - sender := &exampleSender{output: output} + sender := &exampleSender{output: logger.New("fake-output")} consumer := kbc.NewBatchConsumer(config, sender) consumer.Start() }