amazon-kinesis-client-go/cmd/batchconsumer/main.go

52 lines
1.1 KiB
Go
Raw Permalink Normal View History

package main
import (
"fmt"
"time"
"gopkg.in/Clever/kayvee-go.v6/logger"
kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer"
)
func main() {
config := kbc.Config{
2017-11-02 21:49:13 +00:00
BatchInterval: 10 * time.Second,
BatchCount: 500,
BatchSize: 4 * 1024 * 1024, // 4Mb
FailedLogsFile: "/tmp/example-kcl-consumer",
}
2017-11-02 21:49:13 +00:00
sender := &exampleSender{output: logger.New("fake-output")}
2017-07-18 19:19:40 +00:00
consumer := kbc.NewBatchConsumer(config, sender)
consumer.Start()
}
2017-07-18 19:19:40 +00:00
type exampleSender struct {
2018-08-09 23:43:30 +00:00
shardID string
output logger.KayveeLogger
}
func (e *exampleSender) Initialize(shardID string) {
e.shardID = shardID
}
func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) {
if len(rawmsg)%5 == 2 {
return nil, nil, kbc.ErrMessageIgnored
}
tag1 := fmt.Sprintf("tag-%d", len(rawmsg)%5)
line := tag1 + ": " + string(rawmsg)
return []byte(line), []string{tag1}, nil
}
2017-07-18 19:19:40 +00:00
func (e *exampleSender) SendBatch(batch [][]byte, tag string) error {
for idx, line := range batch {
e.output.InfoD(tag, logger.M{"idx": idx, "line": string(line)})
}
return nil
}