Renamed methods to be more generic and to remove reference to logs

This commit is contained in:
Xavi Ramirez 2017-07-19 00:21:31 +00:00
parent f116c752f5
commit 1b2afcafc0
3 changed files with 33 additions and 33 deletions

View file

@ -5,42 +5,42 @@ import (
"fmt" "fmt"
) )
// ErrLogIgnored should be returned by EncodeLog when it encounters a log line that will not be // ErrMessageIgnored should be returned by ProcessMessage when it encounters a message that will
// consumed // not be consumed
var ErrLogIgnored = errors.New("Log intentionally skipped by sender") var ErrMessageIgnored = errors.New("Message intentionally skipped by sender")
// Sender an interface needed for batch consumer implementations // Sender an interface needed for batch consumer implementations
type Sender interface { type Sender interface {
// EncodeLog receives a raw log line. It's expected to return an appropriately formated log // ProcessMessage receives a raw message and is expected to return an appropriately formatted
// as well as a list of tags for that log line. A tag corresponds to a batch that it'll be // message as well as a list of tags for that log line. A tag corresponds to a batch that
// put into. Typically tags are series names. // it'll be put into. Typically tags are series names.
// If a log line will not be consumed, EncodeLog should return a ErrLogIgnored error. // If a message will not be consumed, ProcessMessage should return a ErrMessageIgnored error.
EncodeLog(rawlog []byte) (log []byte, tags []string, err error) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error)
// SendBatch receives a batch of log lines. All log lines were given the specified tag by // SendBatch receives a batch of messages. All messages were given the specified tag by
// EncodeLog // ProcessMessage
SendBatch(batch [][]byte, tag string) error SendBatch(batch [][]byte, tag string) error
} }
// PartialOutputError should be returned by SendBatch implementations when a handful of log lines // PartialSendBatchError should be returned by SendBatch implementations when some messages
// couldn't be sent to an output. It's expected that SendBatch implementations do a "best effort" // couldn't be sent to an output. It's expected that SendBatch implementations do a "best effort"
// before returning this error. // before returning this error.
type PartialOutputError struct { type PartialSendBatchError struct {
// Message is a description of error that occurred // ErrMessage is a description of error that occurred
Message string ErrMessage string
// Logs a list of logs that failed to be sent // FailedMessages a list of messages that failed to be sent
Logs [][]byte FailedMessages [][]byte
} }
func (c PartialOutputError) Error() string { func (c PartialSendBatchError) Error() string {
return fmt.Sprintf("%d failed logs. %s", len(c.Logs), c.Message) return fmt.Sprintf("%d failed logs. %s", len(c.FailedMessages), c.ErrMessage)
} }
// CatastrophicOutputError should be returned by SendBatch implementations when the output is // CatastrophicSendBatchError should be returned by SendBatch implementations when the output is
// unreachable. Returning this error causes this container to exit without checkpointing. // unreachable. Returning this error causes this container to exit without checkpointing.
type CatastrophicOutputError struct { type CatastrophicSendBatchError struct {
Message string ErrMessage string
} }
func (c CatastrophicOutputError) Error() string { func (c CatastrophicSendBatchError) Error() string {
return c.Message return fmt.Sprintf("catastrophic SendBatch error: %s", c.ErrMessage)
} }

View file

@ -159,8 +159,8 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
return err return err
} }
for _, rawlog := range rawlogs { for _, rawlog := range rawlogs {
log, tags, err := b.sender.EncodeLog(rawlog) log, tags, err := b.sender.ProcessMessage(rawlog)
if err == ErrLogIgnored { if err == ErrMessageIgnored {
continue // Skip message continue // Skip message
} else if err != nil { } else if err != nil {
return err return err
@ -220,12 +220,12 @@ func (b *batchedWriter) SendBatch(batch [][]byte, tag string) {
err := b.sender.SendBatch(batch, tag) err := b.sender.SendBatch(batch, tag)
switch e := err.(type) { switch e := err.(type) {
case nil: // Do nothing case nil: // Do nothing
case PartialOutputError: case PartialSendBatchError:
b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) b.log.ErrorD("send-batch", kv.M{"msg": e.Error()})
for _, line := range e.Logs { for _, line := range e.FailedMessages {
b.log.ErrorD("failed-log", kv.M{"log": line}) b.log.ErrorD("failed-log", kv.M{"log": line})
} }
case CatastrophicOutputError: case CatastrophicSendBatchError:
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
os.Exit(1) os.Exit(1)
default: default:

View file

@ -44,13 +44,13 @@ type exampleSender struct {
output logger.KayveeLogger output logger.KayveeLogger
} }
func (e *exampleSender) EncodeLog(rawlog []byte) ([]byte, []string, error) { func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) {
if len(rawlog)%5 == 2 { if len(rawmsg)%5 == 2 {
return nil, nil, kbc.ErrLogIgnored return nil, nil, kbc.ErrMessageIgnored
} }
tag1 := fmt.Sprintf("tag-%d", len(rawlog)%5) tag1 := fmt.Sprintf("tag-%d", len(rawmsg)%5)
line := tag1 + ": " + string(rawlog) line := tag1 + ": " + string(rawmsg)
return []byte(line), []string{tag1}, nil return []byte(line), []string{tag1}, nil
} }