diff --git a/batchconsumer/sender.go b/batchconsumer/sender.go index c26be76..03e0421 100644 --- a/batchconsumer/sender.go +++ b/batchconsumer/sender.go @@ -5,42 +5,42 @@ import ( "fmt" ) -// ErrLogIgnored should be returned by EncodeLog when it encounters a log line that will not be -// consumed -var ErrLogIgnored = errors.New("Log intentionally skipped by sender") +// ErrMessageIgnored should be returned by ProcessMessage when it encounters a message that will +// not be consumed +var ErrMessageIgnored = errors.New("Message intentionally skipped by sender") // Sender an interface needed for batch consumer implementations type Sender interface { - // EncodeLog receives a raw log line. It's expected to return an appropriately formated log - // as well as a list of tags for that log line. A tag corresponds to a batch that it'll be - // put into. Typically tags are series names. - // If a log line will not be consumed, EncodeLog should return a ErrLogIgnored error. - EncodeLog(rawlog []byte) (log []byte, tags []string, err error) - // SendBatch receives a batch of log lines. All log lines were given the specified tag by - // EncodeLog + // ProcessMessage receives a raw message and is expected to return an appropriately formatted + // message as well as a list of tags for that log line. A tag corresponds to a batch that + // it'll be put into. Typically tags are series names. + // If a message will not be consumed, ProcessMessage should return a ErrMessageIgnored error. + ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) + // SendBatch receives a batch of messages. All messages were given the specified tag by + // ProcessMessage SendBatch(batch [][]byte, tag string) error } -// PartialOutputError should be returned by SendBatch implementations when a handful of log lines +// PartialSendBatchError should be returned by SendBatch implementations when some messages // couldn't be sent to an output. It's expected that SendBatch implementations do a "best effort" // before returning this error. -type PartialOutputError struct { - // Message is a description of error that occurred - Message string - // Logs a list of logs that failed to be sent - Logs [][]byte +type PartialSendBatchError struct { + // ErrMessage is a description of error that occurred + ErrMessage string + // FailedMessages a list of messages that failed to be sent + FailedMessages [][]byte } -func (c PartialOutputError) Error() string { - return fmt.Sprintf("%d failed logs. %s", len(c.Logs), c.Message) +func (c PartialSendBatchError) Error() string { + return fmt.Sprintf("%d failed logs. %s", len(c.FailedMessages), c.ErrMessage) } -// CatastrophicOutputError should be returned by SendBatch implementations when the output is +// CatastrophicSendBatchError should be returned by SendBatch implementations when the output is // unreachable. Returning this error causes this container to exit without checkpointing. -type CatastrophicOutputError struct { - Message string +type CatastrophicSendBatchError struct { + ErrMessage string } -func (c CatastrophicOutputError) Error() string { - return c.Message +func (c CatastrophicSendBatchError) Error() string { + return fmt.Sprintf("catastrophic SendBatch error: %s", c.ErrMessage) } diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index a703880..6e3a3a7 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -159,8 +159,8 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { return err } for _, rawlog := range rawlogs { - log, tags, err := b.sender.EncodeLog(rawlog) - if err == ErrLogIgnored { + log, tags, err := b.sender.ProcessMessage(rawlog) + if err == ErrMessageIgnored { continue // Skip message } else if err != nil { return err @@ -220,12 +220,12 @@ func (b *batchedWriter) SendBatch(batch [][]byte, tag string) { err := b.sender.SendBatch(batch, tag) switch e := err.(type) { case nil: // Do nothing - case PartialOutputError: + case PartialSendBatchError: b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) - for _, line := range e.Logs { + for _, line := range e.FailedMessages { b.log.ErrorD("failed-log", kv.M{"log": line}) } - case CatastrophicOutputError: + case CatastrophicSendBatchError: b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) default: diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index 0285139..5d35924 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -44,13 +44,13 @@ type exampleSender struct { output logger.KayveeLogger } -func (e *exampleSender) EncodeLog(rawlog []byte) ([]byte, []string, error) { - if len(rawlog)%5 == 2 { - return nil, nil, kbc.ErrLogIgnored +func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) { + if len(rawmsg)%5 == 2 { + return nil, nil, kbc.ErrMessageIgnored } - tag1 := fmt.Sprintf("tag-%d", len(rawlog)%5) - line := tag1 + ": " + string(rawlog) + tag1 := fmt.Sprintf("tag-%d", len(rawmsg)%5) + line := tag1 + ": " + string(rawmsg) return []byte(line), []string{tag1}, nil }