Merge pull request #21 from Clever/only-send-failed-log-lines-to-file

write errors to stderr, failed log lines to file
This commit is contained in:
Rafael 2017-11-03 11:05:31 -07:00 committed by GitHub
commit ebf2e2dc98
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 78 additions and 88 deletions

View file

@ -10,6 +10,8 @@ import (
"github.com/Clever/amazon-kinesis-client-go/kcl" "github.com/Clever/amazon-kinesis-client-go/kcl"
) )
var lg = kv.New("amazon-kinesis-client-go")
type tagMsgPair struct { type tagMsgPair struct {
tag string tag string
msg []byte msg []byte
@ -23,9 +25,9 @@ type batcherManagerConfig struct {
} }
type batcherManager struct { type batcherManager struct {
log kv.KayveeLogger failedLogsFile kv.KayveeLogger
sender Sender sender Sender
chkpntManager *checkpointManager chkpntManager *checkpointManager
batchCount int batchCount int
batchSize int batchSize int
@ -38,12 +40,12 @@ type batcherManager struct {
} }
func newBatcherManager( func newBatcherManager(
sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger, sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, failedLogsFile kv.KayveeLogger,
) *batcherManager { ) *batcherManager {
bm := &batcherManager{ bm := &batcherManager{
log: log, failedLogsFile: failedLogsFile,
sender: sender, sender: sender,
chkpntManager: chkpntManager, chkpntManager: chkpntManager,
batchCount: cfg.BatchCount, batchCount: cfg.BatchCount,
batchSize: cfg.BatchSize, batchSize: cfg.BatchSize,
@ -96,16 +98,16 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) {
switch e := err.(type) { switch e := err.(type) {
case nil: // Do nothing case nil: // Do nothing
case PartialSendBatchError: case PartialSendBatchError:
b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) lg.ErrorD("send-batch", kv.M{"msg": e.Error()})
for _, line := range e.FailedMessages { for _, line := range e.FailedMessages {
b.log.ErrorD("failed-log", kv.M{"log": line}) b.failedLogsFile.ErrorD("failed-log", kv.M{"log": line})
} }
stats.Counter("batch-log-failures", len(e.FailedMessages)) stats.Counter("batch-log-failures", len(e.FailedMessages))
case CatastrophicSendBatchError: case CatastrophicSendBatchError:
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) lg.CriticalD("send-batch", kv.M{"msg": e.Error()})
os.Exit(1) os.Exit(1)
default: default:
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) lg.CriticalD("send-batch", kv.M{"msg": e.Error()})
os.Exit(1) os.Exit(1)
} }
@ -184,8 +186,14 @@ func (b *batcherManager) startMessageHandler(
batcher.AddMessage(tmp.msg, tmp.pair) batcher.AddMessage(tmp.msg, tmp.pair)
} else if err != nil { } else if err != nil {
b.log.ErrorD("add-message", kv.M{ lg.ErrorD("add-message", kv.M{
"err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, "err": err.Error(),
"tag": tmp.tag,
})
b.failedLogsFile.ErrorD("add-message", kv.M{
"err": err.Error(),
"msg": string(tmp.msg),
"tag": tmp.tag,
}) })
} }
stats.Counter("msg-batched", 1) stats.Counter("msg-batched", 1)

View file

@ -3,15 +3,11 @@ package batchconsumer
import ( import (
"time" "time"
kv "gopkg.in/Clever/kayvee-go.v6/logger"
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
"github.com/Clever/amazon-kinesis-client-go/kcl" "github.com/Clever/amazon-kinesis-client-go/kcl"
) )
type checkpointManager struct { type checkpointManager struct {
log kv.KayveeLogger
checkpointFreq time.Duration checkpointFreq time.Duration
checkpoint chan kcl.SequencePair checkpoint chan kcl.SequencePair
@ -19,12 +15,8 @@ type checkpointManager struct {
shutdown chan chan<- struct{} shutdown chan chan<- struct{}
} }
func newCheckpointManager( func newCheckpointManager(checkpointer kcl.Checkpointer, checkpointFreq time.Duration) *checkpointManager {
checkpointer kcl.Checkpointer, checkpointFreq time.Duration, log kv.KayveeLogger,
) *checkpointManager {
cm := &checkpointManager{ cm := &checkpointManager{
log: log,
checkpointFreq: checkpointFreq, checkpointFreq: checkpointFreq,
checkpoint: make(chan kcl.SequencePair), checkpoint: make(chan kcl.SequencePair),

View file

@ -13,8 +13,8 @@ import (
// Config used for BatchConsumer constructor. Any empty fields are populated with defaults. // Config used for BatchConsumer constructor. Any empty fields are populated with defaults.
type Config struct { type Config struct {
// LogFile where consumer errors and failed log lines are saved // FailedLogsFile is where logs that failed to process are written.
LogFile string FailedLogsFile string
// BatchInterval the upper bound on how often SendBatch is called with accumulated messages // BatchInterval the upper bound on how often SendBatch is called with accumulated messages
BatchInterval time.Duration BatchInterval time.Duration
@ -34,13 +34,13 @@ type Config struct {
// BatchConsumer is responsible for marshalling // BatchConsumer is responsible for marshalling
type BatchConsumer struct { type BatchConsumer struct {
kclProcess *kcl.KCLProcess kclProcess *kcl.KCLProcess
logfile *os.File failedLogsFile *os.File
} }
func withDefaults(config Config) Config { func withDefaults(config Config) Config {
if config.LogFile == "" { if config.FailedLogsFile == "" {
config.LogFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) config.FailedLogsFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339)
} }
if config.BatchInterval == 0 { if config.BatchInterval == 0 {
@ -74,20 +74,19 @@ func NewBatchConsumerFromFiles(
) *BatchConsumer { ) *BatchConsumer {
config = withDefaults(config) config = withDefaults(config)
file, err := os.OpenFile(config.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) file, err := os.OpenFile(config.FailedLogsFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil { if err != nil {
log.Fatalf("Unable to create log file: %s", err.Error()) log.Fatalf("Unable to create log file: %s", err.Error())
} }
failedLogsFile := logger.New("amazon-kinesis-client-go/batchconsumer")
failedLogsFile.SetOutput(file)
kvlog := logger.New("amazon-kinesis-client-go") wrt := NewBatchedWriter(config, sender, failedLogsFile)
kvlog.SetOutput(file)
wrt := NewBatchedWriter(config, sender, kvlog)
kclProcess := kcl.New(input, output, errFile, wrt) kclProcess := kcl.New(input, output, errFile, wrt)
return &BatchConsumer{ return &BatchConsumer{
kclProcess: kclProcess, kclProcess: kclProcess,
logfile: file, failedLogsFile: file,
} }
} }
@ -100,5 +99,5 @@ func NewBatchConsumer(config Config, sender Sender) *BatchConsumer {
// Start when called, the consumer begins ingesting messages. This function blocks. // Start when called, the consumer begins ingesting messages. This function blocks.
func (b *BatchConsumer) Start() { func (b *BatchConsumer) Start() {
b.kclProcess.Run() b.kclProcess.Run()
b.logfile.Close() b.failedLogsFile.Close()
} }

View file

@ -15,9 +15,9 @@ import (
) )
type batchedWriter struct { type batchedWriter struct {
config Config config Config
sender Sender sender Sender
log kv.KayveeLogger failedLogsFile kv.KayveeLogger
shardID string shardID string
@ -30,11 +30,11 @@ type batchedWriter struct {
lastProcessedSeq kcl.SequencePair lastProcessedSeq kcl.SequencePair
} }
func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter { func NewBatchedWriter(config Config, sender Sender, failedLogsFile kv.KayveeLogger) *batchedWriter {
return &batchedWriter{ return &batchedWriter{
config: config, config: config,
sender: sender, sender: sender,
log: log, failedLogsFile: failedLogsFile,
rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit), rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit),
} }
@ -49,8 +49,8 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer
BatchInterval: b.config.BatchInterval, BatchInterval: b.config.BatchInterval,
} }
b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq)
b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log) b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile)
return nil return nil
} }
@ -103,20 +103,20 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
continue // Skip message continue // Skip message
} else if err != nil { } else if err != nil {
stats.Counter("unknown-error", 1) stats.Counter("unknown-error", 1)
b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) lg.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)})
continue // Don't stop processing messages because of one bad message continue // Don't stop processing messages because of one bad message
} }
if len(tags) == 0 { if len(tags) == 0 {
stats.Counter("no-tags", 1) stats.Counter("no-tags", 1)
b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) lg.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)})
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg)) return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg))
} }
for _, tag := range tags { for _, tag := range tags {
if tag == "" { if tag == "" {
stats.Counter("blank-tag", 1) stats.Counter("blank-tag", 1)
b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) lg.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)})
return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg)) return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg))
} }
@ -144,9 +144,9 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
func (b *batchedWriter) Shutdown(reason string) error { func (b *batchedWriter) Shutdown(reason string) error {
if reason == "TERMINATE" { if reason == "TERMINATE" {
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) lg.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
} else { } else {
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) lg.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
} }
done := b.batcherManager.Shutdown() done := b.batcherManager.Shutdown()

View file

@ -135,14 +135,14 @@ func encode(str string) string {
func TestProcessRecordsIgnoredMessages(t *testing.T) { func TestProcessRecordsIgnoredMessages(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
mocklog := logger.New("testing") mockFailedLogsFile := logger.New("testing")
mockconfig := withDefaults(Config{ mockconfig := withDefaults(Config{
BatchInterval: 10 * time.Millisecond, BatchInterval: 10 * time.Millisecond,
CheckpointFreq: 20 * time.Millisecond, CheckpointFreq: 20 * time.Millisecond,
}) })
mockcheckpointer := NewMockCheckpointer(5 * time.Second) mockcheckpointer := NewMockCheckpointer(5 * time.Second)
wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog) wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mockFailedLogsFile)
wrt.Initialize("test-shard", mockcheckpointer) wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{ err := wrt.ProcessRecords([]kcl.Record{
@ -165,7 +165,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) {
func TestProcessRecordsSingleBatchBasic(t *testing.T) { func TestProcessRecordsSingleBatchBasic(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
mocklog := logger.New("testing") mockFailedLogsFile := logger.New("testing")
mockconfig := withDefaults(Config{ mockconfig := withDefaults(Config{
BatchCount: 2, BatchCount: 2,
CheckpointFreq: 1, // Don't throttle checks points CheckpointFreq: 1, // Don't throttle checks points
@ -173,7 +173,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) {
mockcheckpointer := NewMockCheckpointer(5 * time.Second) mockcheckpointer := NewMockCheckpointer(5 * time.Second)
mocksender := NewMsgAsTagSender() mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile)
wrt.Initialize("test-shard", mockcheckpointer) wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{ err := wrt.ProcessRecords([]kcl.Record{
@ -212,7 +212,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) {
func TestProcessRecordsMutliBatchBasic(t *testing.T) { func TestProcessRecordsMutliBatchBasic(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
mocklog := logger.New("testing") mockFailedLogsFile := logger.New("testing")
mockconfig := withDefaults(Config{ mockconfig := withDefaults(Config{
BatchInterval: 100 * time.Millisecond, BatchInterval: 100 * time.Millisecond,
CheckpointFreq: 200 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond,
@ -220,7 +220,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) {
mockcheckpointer := NewMockCheckpointer(5 * time.Second) mockcheckpointer := NewMockCheckpointer(5 * time.Second)
mocksender := NewMsgAsTagSender() mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile)
wrt.Initialize("test-shard", mockcheckpointer) wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{ err := wrt.ProcessRecords([]kcl.Record{
@ -270,7 +270,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) {
func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
mocklog := logger.New("testing") mockFailedLogsFile := logger.New("testing")
mockconfig := withDefaults(Config{ mockconfig := withDefaults(Config{
BatchInterval: 100 * time.Millisecond, BatchInterval: 100 * time.Millisecond,
CheckpointFreq: 200 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond,
@ -278,7 +278,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
mockcheckpointer := NewMockCheckpointer(5 * time.Second) mockcheckpointer := NewMockCheckpointer(5 * time.Second)
mocksender := NewMsgAsTagSender() mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile)
wrt.Initialize("test-shard", mockcheckpointer) wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{ err := wrt.ProcessRecords([]kcl.Record{
@ -346,7 +346,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
func TestStaggeredCheckpionting(t *testing.T) { func TestStaggeredCheckpionting(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
mocklog := logger.New("testing") mockFailedLogsFile := logger.New("testing")
mockconfig := withDefaults(Config{ mockconfig := withDefaults(Config{
BatchCount: 2, BatchCount: 2,
BatchInterval: 100 * time.Millisecond, BatchInterval: 100 * time.Millisecond,
@ -355,7 +355,7 @@ func TestStaggeredCheckpionting(t *testing.T) {
mockcheckpointer := NewMockCheckpointer(5 * time.Second) mockcheckpointer := NewMockCheckpointer(5 * time.Second)
mocksender := NewMsgAsTagSender() mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile)
wrt.Initialize("test-shard", mockcheckpointer) wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{ err := wrt.ProcessRecords([]kcl.Record{

View file

@ -2,8 +2,6 @@ package main
import ( import (
"fmt" "fmt"
"log"
"os"
"time" "time"
"gopkg.in/Clever/kayvee-go.v6/logger" "gopkg.in/Clever/kayvee-go.v6/logger"
@ -11,30 +9,15 @@ import (
kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer" kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer"
) )
func createDummyOutput() (logger.KayveeLogger, *os.File) {
file, err := os.OpenFile("/tmp/example-kcl-output", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Unable to create log file: %s", err.Error())
}
kvlog := logger.New("amazon-kinesis-client-go")
kvlog.SetOutput(file)
return kvlog, file
}
func main() { func main() {
config := kbc.Config{ config := kbc.Config{
BatchInterval: 10 * time.Second, BatchInterval: 10 * time.Second,
BatchCount: 500, BatchCount: 500,
BatchSize: 4 * 1024 * 1024, // 4Mb BatchSize: 4 * 1024 * 1024, // 4Mb
LogFile: "/tmp/example-kcl-consumer", FailedLogsFile: "/tmp/example-kcl-consumer",
} }
output, file := createDummyOutput() sender := &exampleSender{output: logger.New("fake-output")}
defer file.Close()
sender := &exampleSender{output: output}
consumer := kbc.NewBatchConsumer(config, sender) consumer := kbc.NewBatchConsumer(config, sender)
consumer.Start() consumer.Start()
} }

View file

@ -1,7 +1,7 @@
# This is the default Clever Golang Makefile. # This is the default Clever Golang Makefile.
# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook # It is stored in the dev-handbook repo, github.com/Clever/dev-handbook
# Please do not alter this file directly. # Please do not alter this file directly.
GOLANG_MK_VERSION := 0.1.4 GOLANG_MK_VERSION := 0.1.5
SHELL := /bin/bash SHELL := /bin/bash
.PHONY: golang-godep-vendor golang-test-deps $(GODEP) .PHONY: golang-godep-vendor golang-test-deps $(GODEP)
@ -10,13 +10,21 @@ SHELL := /bin/bash
GOPATH=$(shell echo $$GOPATH | cut -d: -f1) GOPATH=$(shell echo $$GOPATH | cut -d: -f1)
# This block checks and confirms that the proper Go toolchain version is installed. # This block checks and confirms that the proper Go toolchain version is installed.
# It uses ^ matching in the semver sense -- you can be ahead by a minor
# version, but not a major version (patch is ignored).
# arg1: golang version # arg1: golang version
define golang-version-check define golang-version-check
GOVERSION := $(shell go version | grep $(1)) _ := $(if \
_ := $(if \ $(shell \
$(shell go version | grep $(1)), \ expr >/dev/null \
@echo "", \ `go version | cut -d" " -f3 | cut -c3- | cut -d. -f2` \
$(error "must be running Go version $(1)")) \>= `echo $(1) | cut -d. -f2` \
\& \
`go version | cut -d" " -f3 | cut -c3- | cut -d. -f1` \
= `echo $(1) | cut -d. -f1` \
&& echo 1), \
@echo "", \
$(error must be running Go version ^$(1) - you are running $(shell go version | cut -d" " -f3 | cut -c3-)))
endef endef
export GO15VENDOREXPERIMENT=1 export GO15VENDOREXPERIMENT=1