write errors to stderr
This commit is contained in:
parent
94aacdf833
commit
221743b3e2
5 changed files with 52 additions and 57 deletions
|
|
@ -24,6 +24,7 @@ type batcherManagerConfig struct {
|
||||||
|
|
||||||
type batcherManager struct {
|
type batcherManager struct {
|
||||||
log kv.KayveeLogger
|
log kv.KayveeLogger
|
||||||
|
failedLogsFile kv.KayveeLogger
|
||||||
sender Sender
|
sender Sender
|
||||||
chkpntManager *checkpointManager
|
chkpntManager *checkpointManager
|
||||||
|
|
||||||
|
|
@ -38,10 +39,12 @@ type batcherManager struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBatcherManager(
|
func newBatcherManager(
|
||||||
sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger,
|
sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig,
|
||||||
|
log kv.KayveeLogger, failedLogsFile kv.KayveeLogger,
|
||||||
) *batcherManager {
|
) *batcherManager {
|
||||||
bm := &batcherManager{
|
bm := &batcherManager{
|
||||||
log: log,
|
log: log,
|
||||||
|
failedLogsFile: failedLogsFile,
|
||||||
sender: sender,
|
sender: sender,
|
||||||
chkpntManager: chkpntManager,
|
chkpntManager: chkpntManager,
|
||||||
|
|
||||||
|
|
@ -98,7 +101,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) {
|
||||||
case PartialSendBatchError:
|
case PartialSendBatchError:
|
||||||
b.log.ErrorD("send-batch", kv.M{"msg": e.Error()})
|
b.log.ErrorD("send-batch", kv.M{"msg": e.Error()})
|
||||||
for _, line := range e.FailedMessages {
|
for _, line := range e.FailedMessages {
|
||||||
b.log.ErrorD("failed-log", kv.M{"log": line})
|
b.failedLogsFile.ErrorD("failed-log", kv.M{"log": line})
|
||||||
}
|
}
|
||||||
stats.Counter("batch-log-failures", len(e.FailedMessages))
|
stats.Counter("batch-log-failures", len(e.FailedMessages))
|
||||||
case CatastrophicSendBatchError:
|
case CatastrophicSendBatchError:
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,11 @@ import (
|
||||||
|
|
||||||
// Config used for BatchConsumer constructor. Any empty fields are populated with defaults.
|
// Config used for BatchConsumer constructor. Any empty fields are populated with defaults.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
// LogFile where consumer errors and failed log lines are saved
|
// Logger for logging info / error logs.
|
||||||
LogFile string
|
Logger logger.KayveeLogger
|
||||||
|
|
||||||
|
// FailedLogsFile is where logs that failed to process are written.
|
||||||
|
FailedLogsFile string
|
||||||
|
|
||||||
// BatchInterval the upper bound on how often SendBatch is called with accumulated messages
|
// BatchInterval the upper bound on how often SendBatch is called with accumulated messages
|
||||||
BatchInterval time.Duration
|
BatchInterval time.Duration
|
||||||
|
|
@ -35,12 +38,16 @@ type Config struct {
|
||||||
// BatchConsumer is responsible for marshalling
|
// BatchConsumer is responsible for marshalling
|
||||||
type BatchConsumer struct {
|
type BatchConsumer struct {
|
||||||
kclProcess *kcl.KCLProcess
|
kclProcess *kcl.KCLProcess
|
||||||
logfile *os.File
|
failedLogsFile *os.File
|
||||||
}
|
}
|
||||||
|
|
||||||
func withDefaults(config Config) Config {
|
func withDefaults(config Config) Config {
|
||||||
if config.LogFile == "" {
|
if config.Logger == nil {
|
||||||
config.LogFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339)
|
config.Logger = logger.New("amazon-kinesis-client-go")
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.FailedLogsFile == "" {
|
||||||
|
config.FailedLogsFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339)
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.BatchInterval == 0 {
|
if config.BatchInterval == 0 {
|
||||||
|
|
@ -74,20 +81,19 @@ func NewBatchConsumerFromFiles(
|
||||||
) *BatchConsumer {
|
) *BatchConsumer {
|
||||||
config = withDefaults(config)
|
config = withDefaults(config)
|
||||||
|
|
||||||
file, err := os.OpenFile(config.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
file, err := os.OpenFile(config.FailedLogsFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Unable to create log file: %s", err.Error())
|
log.Fatalf("Unable to create log file: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
failedLogsFile := logger.New("amazon-kinesis-client-go")
|
||||||
|
failedLogsFile.SetOutput(file)
|
||||||
|
|
||||||
kvlog := logger.New("amazon-kinesis-client-go")
|
wrt := NewBatchedWriter(config, sender, config.Logger, failedLogsFile)
|
||||||
kvlog.SetOutput(file)
|
|
||||||
|
|
||||||
wrt := NewBatchedWriter(config, sender, kvlog)
|
|
||||||
kclProcess := kcl.New(input, output, errFile, wrt)
|
kclProcess := kcl.New(input, output, errFile, wrt)
|
||||||
|
|
||||||
return &BatchConsumer{
|
return &BatchConsumer{
|
||||||
kclProcess: kclProcess,
|
kclProcess: kclProcess,
|
||||||
logfile: file,
|
failedLogsFile: file,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -100,5 +106,5 @@ func NewBatchConsumer(config Config, sender Sender) *BatchConsumer {
|
||||||
// Start when called, the consumer begins ingesting messages. This function blocks.
|
// Start when called, the consumer begins ingesting messages. This function blocks.
|
||||||
func (b *BatchConsumer) Start() {
|
func (b *BatchConsumer) Start() {
|
||||||
b.kclProcess.Run()
|
b.kclProcess.Run()
|
||||||
b.logfile.Close()
|
b.failedLogsFile.Close()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ type batchedWriter struct {
|
||||||
config Config
|
config Config
|
||||||
sender Sender
|
sender Sender
|
||||||
log kv.KayveeLogger
|
log kv.KayveeLogger
|
||||||
|
failedLogsFile kv.KayveeLogger
|
||||||
|
|
||||||
shardID string
|
shardID string
|
||||||
|
|
||||||
|
|
@ -30,11 +31,12 @@ type batchedWriter struct {
|
||||||
lastProcessedSeq kcl.SequencePair
|
lastProcessedSeq kcl.SequencePair
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter {
|
func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger, failedLogsFile kv.KayveeLogger) *batchedWriter {
|
||||||
return &batchedWriter{
|
return &batchedWriter{
|
||||||
config: config,
|
config: config,
|
||||||
sender: sender,
|
sender: sender,
|
||||||
log: log,
|
log: log,
|
||||||
|
failedLogsFile: failedLogsFile,
|
||||||
|
|
||||||
rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit),
|
rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit),
|
||||||
}
|
}
|
||||||
|
|
@ -50,7 +52,7 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer
|
||||||
}
|
}
|
||||||
|
|
||||||
b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log)
|
b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log)
|
||||||
b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log)
|
b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log, b.failedLogsFile)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -142,7 +142,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) {
|
||||||
})
|
})
|
||||||
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
||||||
|
|
||||||
wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog)
|
wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog, mocklog)
|
||||||
wrt.Initialize("test-shard", mockcheckpointer)
|
wrt.Initialize("test-shard", mockcheckpointer)
|
||||||
|
|
||||||
err := wrt.ProcessRecords([]kcl.Record{
|
err := wrt.ProcessRecords([]kcl.Record{
|
||||||
|
|
@ -173,7 +173,7 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) {
|
||||||
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
||||||
mocksender := NewMsgAsTagSender()
|
mocksender := NewMsgAsTagSender()
|
||||||
|
|
||||||
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog)
|
||||||
wrt.Initialize("test-shard", mockcheckpointer)
|
wrt.Initialize("test-shard", mockcheckpointer)
|
||||||
|
|
||||||
err := wrt.ProcessRecords([]kcl.Record{
|
err := wrt.ProcessRecords([]kcl.Record{
|
||||||
|
|
@ -220,7 +220,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) {
|
||||||
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
||||||
mocksender := NewMsgAsTagSender()
|
mocksender := NewMsgAsTagSender()
|
||||||
|
|
||||||
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog)
|
||||||
wrt.Initialize("test-shard", mockcheckpointer)
|
wrt.Initialize("test-shard", mockcheckpointer)
|
||||||
|
|
||||||
err := wrt.ProcessRecords([]kcl.Record{
|
err := wrt.ProcessRecords([]kcl.Record{
|
||||||
|
|
@ -278,7 +278,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
|
||||||
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
||||||
mocksender := NewMsgAsTagSender()
|
mocksender := NewMsgAsTagSender()
|
||||||
|
|
||||||
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog)
|
||||||
wrt.Initialize("test-shard", mockcheckpointer)
|
wrt.Initialize("test-shard", mockcheckpointer)
|
||||||
|
|
||||||
err := wrt.ProcessRecords([]kcl.Record{
|
err := wrt.ProcessRecords([]kcl.Record{
|
||||||
|
|
@ -355,7 +355,7 @@ func TestStaggeredCheckpionting(t *testing.T) {
|
||||||
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
||||||
mocksender := NewMsgAsTagSender()
|
mocksender := NewMsgAsTagSender()
|
||||||
|
|
||||||
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog, mocklog)
|
||||||
wrt.Initialize("test-shard", mockcheckpointer)
|
wrt.Initialize("test-shard", mockcheckpointer)
|
||||||
|
|
||||||
err := wrt.ProcessRecords([]kcl.Record{
|
err := wrt.ProcessRecords([]kcl.Record{
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,6 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gopkg.in/Clever/kayvee-go.v6/logger"
|
"gopkg.in/Clever/kayvee-go.v6/logger"
|
||||||
|
|
@ -11,30 +9,16 @@ import (
|
||||||
kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer"
|
kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer"
|
||||||
)
|
)
|
||||||
|
|
||||||
func createDummyOutput() (logger.KayveeLogger, *os.File) {
|
|
||||||
file, err := os.OpenFile("/tmp/example-kcl-output", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Unable to create log file: %s", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
kvlog := logger.New("amazon-kinesis-client-go")
|
|
||||||
kvlog.SetOutput(file)
|
|
||||||
|
|
||||||
return kvlog, file
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
config := kbc.Config{
|
config := kbc.Config{
|
||||||
BatchInterval: 10 * time.Second,
|
BatchInterval: 10 * time.Second,
|
||||||
BatchCount: 500,
|
BatchCount: 500,
|
||||||
BatchSize: 4 * 1024 * 1024, // 4Mb
|
BatchSize: 4 * 1024 * 1024, // 4Mb
|
||||||
LogFile: "/tmp/example-kcl-consumer",
|
Logger: logger.New("amazon-kinesis-client-go"),
|
||||||
|
FailedLogsFile: "/tmp/example-kcl-consumer",
|
||||||
}
|
}
|
||||||
|
|
||||||
output, file := createDummyOutput()
|
sender := &exampleSender{output: logger.New("fake-output")}
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
sender := &exampleSender{output: output}
|
|
||||||
consumer := kbc.NewBatchConsumer(config, sender)
|
consumer := kbc.NewBatchConsumer(config, sender)
|
||||||
consumer.Start()
|
consumer.Start()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue