Fixed and added unit tests
This commit is contained in:
parent
a329c40645
commit
8d273d6a1e
11 changed files with 235 additions and 59 deletions
8
Makefile
8
Makefile
|
|
@ -1,6 +1,10 @@
|
||||||
|
include golang.mk
|
||||||
|
.DEFAULT_GOAL := test # override default goal set in library makefile
|
||||||
|
|
||||||
SHELL := /bin/bash
|
SHELL := /bin/bash
|
||||||
JAR_DIR := jars
|
JAR_DIR := jars
|
||||||
PKG := github.com/Clever/amazon-kinesis-client-go
|
PKG := github.com/Clever/amazon-kinesis-client-go
|
||||||
|
PKGS := $(shell go list ./... | grep -v /vendor )
|
||||||
.PHONY: download_jars run build
|
.PHONY: download_jars run build
|
||||||
|
|
||||||
URL_PREFIX := http://search.maven.org/remotecontent?filepath=
|
URL_PREFIX := http://search.maven.org/remotecontent?filepath=
|
||||||
|
|
@ -49,3 +53,7 @@ run: build download_jars
|
||||||
|
|
||||||
bench:
|
bench:
|
||||||
go test -bench=. github.com/Clever/amazon-kinesis-client-go/decode/
|
go test -bench=. github.com/Clever/amazon-kinesis-client-go/decode/
|
||||||
|
|
||||||
|
test: $(PKGS)
|
||||||
|
$(PKGS): golang-test-all-deps
|
||||||
|
$(call golang-test-all,$@)
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// SequencePair a convience way to pass around a Sequence / SubSequence pair
|
||||||
type SequencePair struct {
|
type SequencePair struct {
|
||||||
Sequence *big.Int
|
Sequence *big.Int
|
||||||
SubSequence int
|
SubSequence int
|
||||||
|
|
|
||||||
|
|
@ -37,8 +37,7 @@ func (m *MockSync) waitForFlush(timeout time.Duration) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const mockSequenceNumber = "99999"
|
var mockSequence = SequencePair{big.NewInt(99999), 12345}
|
||||||
const mockSubSequenceNumber = 12345
|
|
||||||
|
|
||||||
func TestBatchingByCount(t *testing.T) {
|
func TestBatchingByCount(t *testing.T) {
|
||||||
var err error
|
var err error
|
||||||
|
|
@ -48,9 +47,9 @@ func TestBatchingByCount(t *testing.T) {
|
||||||
batcher := New(sync, time.Hour, 2, 1024*1024)
|
batcher := New(sync, time.Hour, 2, 1024*1024)
|
||||||
|
|
||||||
t.Log("Batcher respect count limit")
|
t.Log("Batcher respect count limit")
|
||||||
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
|
||||||
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence))
|
||||||
assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequence))
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
err = sync.waitForFlush(time.Millisecond * 10)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
@ -73,7 +72,7 @@ func TestBatchingByTime(t *testing.T) {
|
||||||
batcher := New(sync, time.Millisecond, 2000000, 1024*1024)
|
batcher := New(sync, time.Millisecond, 2000000, 1024*1024)
|
||||||
|
|
||||||
t.Log("Batcher sends partial batches when time expires")
|
t.Log("Batcher sends partial batches when time expires")
|
||||||
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
err = sync.waitForFlush(time.Millisecond * 10)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
@ -83,8 +82,8 @@ func TestBatchingByTime(t *testing.T) {
|
||||||
assert.Equal("hihi", string(sync.batches[0][0]))
|
assert.Equal("hihi", string(sync.batches[0][0]))
|
||||||
|
|
||||||
t.Log("Batcher sends all messsages in partial batches when time expires")
|
t.Log("Batcher sends all messsages in partial batches when time expires")
|
||||||
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence))
|
||||||
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence))
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
err = sync.waitForFlush(time.Millisecond * 10)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
@ -107,7 +106,7 @@ func TestBatchingBySize(t *testing.T) {
|
||||||
batcher := New(sync, time.Hour, 2000000, 8)
|
batcher := New(sync, time.Hour, 2000000, 8)
|
||||||
|
|
||||||
t.Log("Large messages are sent immediately")
|
t.Log("Large messages are sent immediately")
|
||||||
assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequence))
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
err = sync.waitForFlush(time.Millisecond * 10)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
@ -117,8 +116,8 @@ func TestBatchingBySize(t *testing.T) {
|
||||||
assert.Equal("hellohello", string(sync.batches[0][0]))
|
assert.Equal("hellohello", string(sync.batches[0][0]))
|
||||||
|
|
||||||
t.Log("Batcher tries not to exceed size limit")
|
t.Log("Batcher tries not to exceed size limit")
|
||||||
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence))
|
||||||
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
err = sync.waitForFlush(time.Millisecond * 10)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
@ -128,7 +127,7 @@ func TestBatchingBySize(t *testing.T) {
|
||||||
assert.Equal("heyhey", string(sync.batches[1][0]))
|
assert.Equal("heyhey", string(sync.batches[1][0]))
|
||||||
|
|
||||||
t.Log("Batcher sends messages that didn't fit in previous batch")
|
t.Log("Batcher sends messages that didn't fit in previous batch")
|
||||||
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequenceNumber, mockSubSequenceNumber)) // At this point "hihi" is in the batch
|
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) // At this point "hihi" is in the batch
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
err = sync.waitForFlush(time.Millisecond * 10)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
@ -139,7 +138,7 @@ func TestBatchingBySize(t *testing.T) {
|
||||||
assert.Equal("yoyo", string(sync.batches[2][1]))
|
assert.Equal("yoyo", string(sync.batches[2][1]))
|
||||||
|
|
||||||
t.Log("Batcher doesn't send partial batches")
|
t.Log("Batcher doesn't send partial batches")
|
||||||
assert.NoError(batcher.AddMessage([]byte("okok"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("okok"), mockSequence))
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
err = sync.waitForFlush(time.Millisecond * 10)
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
|
|
@ -153,7 +152,7 @@ func TestFlushing(t *testing.T) {
|
||||||
batcher := New(sync, time.Hour, 2000000, 1024*1024)
|
batcher := New(sync, time.Hour, 2000000, 1024*1024)
|
||||||
|
|
||||||
t.Log("Calling flush sends pending messages")
|
t.Log("Calling flush sends pending messages")
|
||||||
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber))
|
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
err = sync.waitForFlush(time.Millisecond * 10)
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
|
|
@ -176,7 +175,7 @@ func TestSendingEmpty(t *testing.T) {
|
||||||
batcher := New(sync, time.Second, 10, 1024*1024)
|
batcher := New(sync, time.Second, 10, 1024*1024)
|
||||||
|
|
||||||
t.Log("An error is returned when an empty message is sent")
|
t.Log("An error is returned when an empty message is sent")
|
||||||
err = batcher.AddMessage([]byte{}, mockSequenceNumber, mockSubSequenceNumber)
|
err = batcher.AddMessage([]byte{}, mockSequence)
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -187,26 +186,29 @@ func TestUpdatingSequence(t *testing.T) {
|
||||||
batcher := New(sync, time.Second, 10, 1024*1024).(*batcher)
|
batcher := New(sync, time.Second, 10, 1024*1024).(*batcher)
|
||||||
|
|
||||||
t.Log("Initally, smallestSeq is undefined")
|
t.Log("Initally, smallestSeq is undefined")
|
||||||
|
assert.Nil(batcher.SmallestSequencePair().Sequence)
|
||||||
|
|
||||||
expected := new(big.Int)
|
expected := new(big.Int)
|
||||||
assert.Nil(batcher.smallestSeq.Sequence)
|
|
||||||
|
|
||||||
t.Log("After AddMessage (seq=1), smallestSeq = 1")
|
t.Log("After AddMessage (seq=1), smallestSeq = 1")
|
||||||
assert.NoError(batcher.AddMessage([]byte("abab"), "1", mockSubSequenceNumber))
|
batcher.updateSequenceNumbers(SequencePair{big.NewInt(1), 1234})
|
||||||
sync.waitForFlush(time.Minute)
|
|
||||||
expected.SetInt64(1)
|
expected.SetInt64(1)
|
||||||
seq := batcher.SmallestSequencePair()
|
seq := batcher.SmallestSequencePair()
|
||||||
assert.True(expected.Cmp(seq.Sequence) == 0)
|
assert.True(expected.Cmp(seq.Sequence) == 0)
|
||||||
|
|
||||||
t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher")
|
t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher")
|
||||||
assert.NoError(batcher.AddMessage([]byte("cdcd"), "2", mockSubSequenceNumber))
|
batcher.updateSequenceNumbers(SequencePair{big.NewInt(2), 1234})
|
||||||
sync.waitForFlush(time.Minute)
|
|
||||||
seq = batcher.SmallestSequencePair()
|
seq = batcher.SmallestSequencePair()
|
||||||
assert.True(expected.Cmp(seq.Sequence) == 0)
|
assert.True(expected.Cmp(seq.Sequence) == 0)
|
||||||
|
|
||||||
t.Log("After AddMessage (seq=1), smallestSeq = 0")
|
t.Log("After AddMessage (seq=1), smallestSeq = 0")
|
||||||
assert.NoError(batcher.AddMessage([]byte("efef"), "0", mockSubSequenceNumber))
|
batcher.updateSequenceNumbers(SequencePair{big.NewInt(0), 1234})
|
||||||
sync.waitForFlush(time.Minute)
|
|
||||||
expected.SetInt64(0)
|
expected.SetInt64(0)
|
||||||
seq = batcher.SmallestSequencePair()
|
seq = batcher.SmallestSequencePair()
|
||||||
assert.True(expected.Cmp(seq.Sequence) == 0)
|
assert.True(expected.Cmp(seq.Sequence) == 0)
|
||||||
|
|
||||||
|
t.Log("Flushing batch clears smallest sequence pair")
|
||||||
|
assert.NoError(batcher.AddMessage([]byte("cdcd"), SequencePair{big.NewInt(2), 1234}))
|
||||||
|
sync.waitForFlush(time.Minute)
|
||||||
|
assert.Nil(batcher.SmallestSequencePair().Sequence)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,11 @@ import (
|
||||||
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Config used for BatchConsumer constructor. Any empty fields are populated with defaults.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
// DeployEnv the name of the deployment environment
|
||||||
|
DeployEnv string
|
||||||
|
|
||||||
// LogFile where consumer errors and failed log lines are saved
|
// LogFile where consumer errors and failed log lines are saved
|
||||||
LogFile string
|
LogFile string
|
||||||
// FlushInterval is how often accumulated messages should be bulk put to firehose
|
// FlushInterval is how often accumulated messages should be bulk put to firehose
|
||||||
|
|
@ -21,15 +25,12 @@ type Config struct {
|
||||||
// FlushSize is the size of a batch in bytes that triggers a push to firehose. Max batch size is 4Mb (4*1024*1024), see: http://docs.aws.amazon.com/firehose/latest/dev/limits.html
|
// FlushSize is the size of a batch in bytes that triggers a push to firehose. Max batch size is 4Mb (4*1024*1024), see: http://docs.aws.amazon.com/firehose/latest/dev/limits.html
|
||||||
FlushSize int
|
FlushSize int
|
||||||
|
|
||||||
// DeployEnv the name of the deployment enviornment
|
|
||||||
DeployEnv string
|
|
||||||
|
|
||||||
// ReadRateLimit the number of records read per seconds
|
// ReadRateLimit the number of records read per seconds
|
||||||
ReadRateLimit int
|
ReadRateLimit int
|
||||||
// ReadBurstLimit the max number of tokens allowed by rate limiter
|
// ReadBurstLimit the max number of tokens allowed by rate limiter
|
||||||
ReadBurstLimit int
|
ReadBurstLimit int
|
||||||
|
|
||||||
// CheckpointFreq the frequence in which a checkpoint is saved
|
// CheckpointFreq the frequency in which a checkpoint is saved
|
||||||
CheckpointFreq time.Duration
|
CheckpointFreq time.Duration
|
||||||
// CheckpointRetries the number of times the consumer will try to save a checkpoint
|
// CheckpointRetries the number of times the consumer will try to save a checkpoint
|
||||||
CheckpointRetries int
|
CheckpointRetries int
|
||||||
|
|
@ -37,6 +38,7 @@ type Config struct {
|
||||||
CheckpointRetrySleep time.Duration
|
CheckpointRetrySleep time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BatchConsumer is responsible for marshalling
|
||||||
type BatchConsumer struct {
|
type BatchConsumer struct {
|
||||||
kclProcess *kcl.KCLProcess
|
kclProcess *kcl.KCLProcess
|
||||||
logfile *os.File
|
logfile *os.File
|
||||||
|
|
@ -81,6 +83,8 @@ func withDefaults(config Config) Config {
|
||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewBatchConsumerFromFiles creates a batch consumer. Readers/writers provided are used for
|
||||||
|
// interprocess communication.
|
||||||
func NewBatchConsumerFromFiles(
|
func NewBatchConsumerFromFiles(
|
||||||
config Config, sender Sender, input io.Reader, output, errFile io.Writer,
|
config Config, sender Sender, input io.Reader, output, errFile io.Writer,
|
||||||
) *BatchConsumer {
|
) *BatchConsumer {
|
||||||
|
|
@ -94,7 +98,7 @@ func NewBatchConsumerFromFiles(
|
||||||
kvlog := logger.New("amazon-kinesis-client-go")
|
kvlog := logger.New("amazon-kinesis-client-go")
|
||||||
kvlog.SetOutput(file)
|
kvlog.SetOutput(file)
|
||||||
|
|
||||||
wrt := &BatchedWriter{
|
wrt := &batchedWriter{
|
||||||
config: config,
|
config: config,
|
||||||
log: kvlog,
|
log: kvlog,
|
||||||
sender: sender,
|
sender: sender,
|
||||||
|
|
@ -107,10 +111,13 @@ func NewBatchConsumerFromFiles(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewBatchConsumer creates batch consumer. Stdin, Stdout, and Stderr are used for interprocess
|
||||||
|
// communication.
|
||||||
func NewBatchConsumer(config Config, sender Sender) *BatchConsumer {
|
func NewBatchConsumer(config Config, sender Sender) *BatchConsumer {
|
||||||
return NewBatchConsumerFromFiles(config, sender, os.Stdin, os.Stdout, os.Stderr)
|
return NewBatchConsumerFromFiles(config, sender, os.Stdin, os.Stdout, os.Stderr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start when called, the consumer begins ingesting messages. This function blocks.
|
||||||
func (b *BatchConsumer) Start() {
|
func (b *BatchConsumer) Start() {
|
||||||
b.kclProcess.Run()
|
b.kclProcess.Run()
|
||||||
b.logfile.Close()
|
b.logfile.Close()
|
||||||
|
|
|
||||||
|
|
@ -5,22 +5,38 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrLogIgnored should be returned by EncodeLog when it encounters a log line that will not be
|
||||||
|
// consumed
|
||||||
var ErrLogIgnored = errors.New("Log intentionally skipped by sender")
|
var ErrLogIgnored = errors.New("Log intentionally skipped by sender")
|
||||||
|
|
||||||
|
// Sender an interface needed for batch consumer implementations
|
||||||
type Sender interface {
|
type Sender interface {
|
||||||
|
// EncodeLog receives a raw log line. It's expected to return an appropriately formated log
|
||||||
|
// as well as a list of tags for that log line. A tag corresponds to a batch that it'll be
|
||||||
|
// put into. Typically tags are series names.
|
||||||
|
// If a log line will not be consumed, EncodeLog should return a ErrLogIgnored error.
|
||||||
EncodeLog(rawlog []byte) (log []byte, tags []string, err error)
|
EncodeLog(rawlog []byte) (log []byte, tags []string, err error)
|
||||||
|
// SendBatch receives a batch of log lines. All log lines were given the specified tag by
|
||||||
|
// EncodeLog
|
||||||
SendBatch(batch [][]byte, tag string) error
|
SendBatch(batch [][]byte, tag string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PartialOutputError should be returned by SendBatch implementations when a handful of log lines
|
||||||
|
// couldn't be sent to an output. It's expected that SendBatch implementations do a "best effort"
|
||||||
|
// before returning this error.
|
||||||
type PartialOutputError struct {
|
type PartialOutputError struct {
|
||||||
|
// Message is a description of error that occurred
|
||||||
Message string
|
Message string
|
||||||
Logs [][]byte
|
// Logs a list of logs that failed to be sent
|
||||||
|
Logs [][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c PartialOutputError) Error() string {
|
func (c PartialOutputError) Error() string {
|
||||||
return fmt.Sprintf("%d failed logs. %s", len(c.Logs), c.Message)
|
return fmt.Sprintf("%d failed logs. %s", len(c.Logs), c.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CatastrophicOutputError should be returned by SendBatch implementations when the output is
|
||||||
|
// unreachable. Returning this error causes this container to exit without checkpointing.
|
||||||
type CatastrophicOutputError struct {
|
type CatastrophicOutputError struct {
|
||||||
Message string
|
Message string
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,11 @@
|
||||||
package batchconsumer
|
package batchconsumer
|
||||||
|
|
||||||
type BatcherSync struct {
|
type batcherSync struct {
|
||||||
tag string
|
tag string
|
||||||
writer *BatchedWriter
|
writer *batchedWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatcherSync) SendBatch(batch [][]byte) {
|
func (b *batcherSync) SendBatch(batch [][]byte) {
|
||||||
b.writer.SendBatch(batch, b.tag)
|
b.writer.SendBatch(batch, b.tag)
|
||||||
b.writer.CheckPointBatch(b.tag)
|
b.writer.CheckPointBatch(b.tag)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ import (
|
||||||
"github.com/Clever/amazon-kinesis-client-go/splitter"
|
"github.com/Clever/amazon-kinesis-client-go/splitter"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BatchedWriter struct {
|
type batchedWriter struct {
|
||||||
config Config
|
config Config
|
||||||
sender Sender
|
sender Sender
|
||||||
log kv.KayveeLogger
|
log kv.KayveeLogger
|
||||||
|
|
@ -31,7 +31,7 @@ type BatchedWriter struct {
|
||||||
lastProcessedSeq batcher.SequencePair
|
lastProcessedSeq batcher.SequencePair
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointer) error {
|
func (b *batchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointer) error {
|
||||||
b.batchers = map[string]batcher.Batcher{}
|
b.batchers = map[string]batcher.Batcher{}
|
||||||
b.shardID = shardID
|
b.shardID = shardID
|
||||||
b.checkpointChan = make(chan batcher.SequencePair)
|
b.checkpointChan = make(chan batcher.SequencePair)
|
||||||
|
|
@ -43,7 +43,7 @@ func (b *BatchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointe
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleCheckpointError returns true if checkout should be tried again. Returns false otherwise.
|
// handleCheckpointError returns true if checkout should be tried again. Returns false otherwise.
|
||||||
func (b *BatchedWriter) handleCheckpointError(err error) bool {
|
func (b *batchedWriter) handleCheckpointError(err error) bool {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
@ -71,7 +71,7 @@ func (b *BatchedWriter) handleCheckpointError(err error) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatchedWriter) startCheckpointListener(
|
func (b *batchedWriter) startCheckpointListener(
|
||||||
checkpointer *kcl.Checkpointer, checkpointChan <-chan batcher.SequencePair,
|
checkpointer *kcl.Checkpointer, checkpointChan <-chan batcher.SequencePair,
|
||||||
) {
|
) {
|
||||||
lastCheckpoint := time.Now()
|
lastCheckpoint := time.Now()
|
||||||
|
|
@ -113,15 +113,15 @@ func (b *BatchedWriter) startCheckpointListener(
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatchedWriter) createBatcher(tag string) batcher.Batcher {
|
func (b *batchedWriter) createBatcher(tag string) batcher.Batcher {
|
||||||
sync := &BatcherSync{
|
sync := &batcherSync{
|
||||||
tag: tag,
|
tag: tag,
|
||||||
writer: b,
|
writer: b,
|
||||||
}
|
}
|
||||||
return batcher.New(sync, b.config.FlushInterval, b.config.FlushCount, b.config.FlushSize)
|
return batcher.New(sync, b.config.FlushInterval, b.config.FlushCount, b.config.FlushSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
|
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
|
||||||
// We handle two types of records:
|
// We handle two types of records:
|
||||||
// - records emitted from CWLogs Subscription
|
// - records emitted from CWLogs Subscription
|
||||||
// - records emiited from KPL
|
// - records emiited from KPL
|
||||||
|
|
@ -134,7 +134,7 @@ func (b *BatchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error)
|
||||||
return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production")
|
return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatchedWriter) ProcessRecords(records []kcl.Record) error {
|
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
curSequence := b.lastProcessedSeq
|
curSequence := b.lastProcessedSeq
|
||||||
|
|
||||||
for _, record := range records {
|
for _, record := range records {
|
||||||
|
|
@ -194,7 +194,7 @@ func (b *BatchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatchedWriter) CheckPointBatch(tag string) {
|
func (b *batchedWriter) CheckPointBatch(tag string) {
|
||||||
smallest := b.lastProcessedSeq
|
smallest := b.lastProcessedSeq
|
||||||
|
|
||||||
for name, batch := range b.batchers {
|
for name, batch := range b.batchers {
|
||||||
|
|
@ -218,7 +218,7 @@ func (b *BatchedWriter) CheckPointBatch(tag string) {
|
||||||
b.checkpointChan <- smallest
|
b.checkpointChan <- smallest
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatchedWriter) SendBatch(batch [][]byte, tag string) {
|
func (b *batchedWriter) SendBatch(batch [][]byte, tag string) {
|
||||||
b.log.Info("sent-batch")
|
b.log.Info("sent-batch")
|
||||||
err := b.sender.SendBatch(batch, tag)
|
err := b.sender.SendBatch(batch, tag)
|
||||||
switch e := err.(type) {
|
switch e := err.(type) {
|
||||||
|
|
@ -237,7 +237,7 @@ func (b *BatchedWriter) SendBatch(batch [][]byte, tag string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatchedWriter) Shutdown(reason string) error {
|
func (b *batchedWriter) Shutdown(reason string) error {
|
||||||
if reason == "TERMINATE" {
|
if reason == "TERMINATE" {
|
||||||
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
|
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
|
||||||
for _, batch := range b.batchers {
|
for _, batch := range b.batchers {
|
||||||
|
|
|
||||||
|
|
@ -35,16 +35,16 @@ func main() {
|
||||||
output, file := createDummyOutput()
|
output, file := createDummyOutput()
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
|
|
||||||
wrt := &ExampleWriter{output: output}
|
sender := &exampleSender{output: output}
|
||||||
consumer := kbc.NewBatchConsumer(config, wrt)
|
consumer := kbc.NewBatchConsumer(config, sender)
|
||||||
consumer.Start()
|
consumer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExampleWriter struct {
|
type exampleSender struct {
|
||||||
output logger.KayveeLogger
|
output logger.KayveeLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExampleWriter) EncodeLog(rawlog []byte) ([]byte, []string, error) {
|
func (e *exampleSender) EncodeLog(rawlog []byte) ([]byte, []string, error) {
|
||||||
if len(rawlog)%5 == 2 {
|
if len(rawlog)%5 == 2 {
|
||||||
return nil, nil, kbc.ErrLogIgnored
|
return nil, nil, kbc.ErrLogIgnored
|
||||||
}
|
}
|
||||||
|
|
@ -55,7 +55,7 @@ func (e *ExampleWriter) EncodeLog(rawlog []byte) ([]byte, []string, error) {
|
||||||
return []byte(line), []string{tag1}, nil
|
return []byte(line), []string{tag1}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExampleWriter) SendBatch(batch [][]byte, tag string) error {
|
func (e *exampleSender) SendBatch(batch [][]byte, tag string) error {
|
||||||
for idx, line := range batch {
|
for idx, line := range batch {
|
||||||
e.output.InfoD(tag, logger.M{"idx": idx, "line": string(line)})
|
e.output.InfoD(tag, logger.M{"idx": idx, "line": string(line)})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SampleRecordProcessor struct {
|
type sampleRecordProcessor struct {
|
||||||
checkpointer *kcl.Checkpointer
|
checkpointer *kcl.Checkpointer
|
||||||
checkpointRetries int
|
checkpointRetries int
|
||||||
checkpointFreq time.Duration
|
checkpointFreq time.Duration
|
||||||
|
|
@ -18,25 +18,25 @@ type SampleRecordProcessor struct {
|
||||||
lastCheckpoint time.Time
|
lastCheckpoint time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *SampleRecordProcessor {
|
func newSampleRecordProcessor() *sampleRecordProcessor {
|
||||||
return &SampleRecordProcessor{
|
return &sampleRecordProcessor{
|
||||||
checkpointRetries: 5,
|
checkpointRetries: 5,
|
||||||
checkpointFreq: 60 * time.Second,
|
checkpointFreq: 60 * time.Second,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srp *SampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.Checkpointer) error {
|
func (srp *sampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.Checkpointer) error {
|
||||||
srp.lastCheckpoint = time.Now()
|
srp.lastCheckpoint = time.Now()
|
||||||
srp.checkpointer = checkpointer
|
srp.checkpointer = checkpointer
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srp *SampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool {
|
func (srp *sampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool {
|
||||||
return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 ||
|
return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 ||
|
||||||
(sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq)
|
(sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
|
func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
|
||||||
for _, record := range records {
|
for _, record := range records {
|
||||||
seqNumber := new(big.Int)
|
seqNumber := new(big.Int)
|
||||||
if _, ok := seqNumber.SetString(record.SequenceNumber, 10); !ok {
|
if _, ok := seqNumber.SetString(record.SequenceNumber, 10); !ok {
|
||||||
|
|
@ -56,7 +56,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srp *SampleRecordProcessor) Shutdown(reason string) error {
|
func (srp *sampleRecordProcessor) Shutdown(reason string) error {
|
||||||
if reason == "TERMINATE" {
|
if reason == "TERMINATE" {
|
||||||
fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n")
|
fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n")
|
||||||
srp.checkpointer.Shutdown()
|
srp.checkpointer.Shutdown()
|
||||||
|
|
@ -72,6 +72,6 @@ func main() {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, New())
|
kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, newSampleRecordProcessor())
|
||||||
kclProcess.Run()
|
kclProcess.Run()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
142
golang.mk
Normal file
142
golang.mk
Normal file
|
|
@ -0,0 +1,142 @@
|
||||||
|
# This is the default Clever Golang Makefile.
|
||||||
|
# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook
|
||||||
|
# Please do not alter this file directly.
|
||||||
|
GOLANG_MK_VERSION := 0.1.4
|
||||||
|
|
||||||
|
SHELL := /bin/bash
|
||||||
|
.PHONY: golang-godep-vendor golang-test-deps $(GODEP)
|
||||||
|
|
||||||
|
# if the gopath includes several directories, use only the first
|
||||||
|
GOPATH=$(shell echo $$GOPATH | cut -d: -f1)
|
||||||
|
|
||||||
|
# This block checks and confirms that the proper Go toolchain version is installed.
|
||||||
|
# arg1: golang version
|
||||||
|
define golang-version-check
|
||||||
|
GOVERSION := $(shell go version | grep $(1))
|
||||||
|
_ := $(if \
|
||||||
|
$(shell go version | grep $(1)), \
|
||||||
|
@echo "", \
|
||||||
|
$(error "must be running Go version $(1)"))
|
||||||
|
endef
|
||||||
|
|
||||||
|
export GO15VENDOREXPERIMENT=1
|
||||||
|
|
||||||
|
# FGT is a utility that exits with 1 whenever any stderr/stdout output is recieved.
|
||||||
|
FGT := $(GOPATH)/bin/fgt
|
||||||
|
$(FGT):
|
||||||
|
go get github.com/GeertJohan/fgt
|
||||||
|
|
||||||
|
# Godep is a tool used to manage Golang dependencies in the style of the Go 1.5
|
||||||
|
# vendoring experiment.
|
||||||
|
GODEP := $(GOPATH)/bin/godep
|
||||||
|
$(GODEP):
|
||||||
|
go get -u github.com/tools/godep
|
||||||
|
|
||||||
|
# Golint is a tool for linting Golang code for common errors.
|
||||||
|
GOLINT := $(GOPATH)/bin/golint
|
||||||
|
$(GOLINT):
|
||||||
|
go get github.com/golang/lint/golint
|
||||||
|
|
||||||
|
# golang-vendor-deps installs all dependencies needed for different test cases.
|
||||||
|
golang-godep-vendor-deps: $(GODEP)
|
||||||
|
|
||||||
|
# golang-godep-vendor is a target for saving dependencies with the godep tool
|
||||||
|
# to the vendor/ directory. All nested vendor/ directories are deleted as they
|
||||||
|
# are not handled well by the Go toolchain.
|
||||||
|
# arg1: pkg path
|
||||||
|
define golang-godep-vendor
|
||||||
|
$(GODEP) save $(1)
|
||||||
|
@# remove any nested vendor directories
|
||||||
|
find vendor/ -path '*/vendor' -type d | xargs -IX rm -r X
|
||||||
|
endef
|
||||||
|
|
||||||
|
# golang-fmt-deps requires the FGT tool for checking output
|
||||||
|
golang-fmt-deps: $(FGT)
|
||||||
|
|
||||||
|
# golang-fmt checks that all golang files in the pkg are formatted correctly.
|
||||||
|
# arg1: pkg path
|
||||||
|
define golang-fmt
|
||||||
|
@echo "FORMATTING $(1)..."
|
||||||
|
@$(FGT) gofmt -l=true $(GOPATH)/src/$(1)/*.go
|
||||||
|
endef
|
||||||
|
|
||||||
|
# golang-lint-deps requires the golint tool for golang linting.
|
||||||
|
golang-lint-deps: $(GOLINT)
|
||||||
|
|
||||||
|
# golang-lint calls golint on all golang files in the pkg.
|
||||||
|
# arg1: pkg path
|
||||||
|
define golang-lint
|
||||||
|
@echo "LINTING $(1)..."
|
||||||
|
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(GOLINT)
|
||||||
|
endef
|
||||||
|
|
||||||
|
# golang-lint-deps-strict requires the golint tool for golang linting.
|
||||||
|
golang-lint-deps-strict: $(GOLINT) $(FGT)
|
||||||
|
|
||||||
|
# golang-lint-strict calls golint on all golang files in the pkg and fails if any lint
|
||||||
|
# errors are found.
|
||||||
|
# arg1: pkg path
|
||||||
|
define golang-lint-strict
|
||||||
|
@echo "LINTING $(1)..."
|
||||||
|
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(FGT) $(GOLINT)
|
||||||
|
endef
|
||||||
|
|
||||||
|
# golang-test-deps is here for consistency
|
||||||
|
golang-test-deps:
|
||||||
|
|
||||||
|
# golang-test uses the Go toolchain to run all tests in the pkg.
|
||||||
|
# arg1: pkg path
|
||||||
|
define golang-test
|
||||||
|
@echo "TESTING $(1)..."
|
||||||
|
@go test -v $(1)
|
||||||
|
endef
|
||||||
|
|
||||||
|
# golang-test-strict-deps is here for consistency
|
||||||
|
golang-test-strict-deps:
|
||||||
|
|
||||||
|
# golang-test-strict uses the Go toolchain to run all tests in the pkg with the race flag
|
||||||
|
# arg1: pkg path
|
||||||
|
define golang-test-strict
|
||||||
|
@echo "TESTING $(1)..."
|
||||||
|
@go test -v -race $(1)
|
||||||
|
endef
|
||||||
|
|
||||||
|
# golang-vet-deps is here for consistency
|
||||||
|
golang-vet-deps:
|
||||||
|
|
||||||
|
# golang-vet uses the Go toolchain to vet all the pkg for common mistakes.
|
||||||
|
# arg1: pkg path
|
||||||
|
define golang-vet
|
||||||
|
@echo "VETTING $(1)..."
|
||||||
|
@go vet $(GOPATH)/src/$(1)/*.go
|
||||||
|
endef
|
||||||
|
|
||||||
|
# golang-test-all-deps installs all dependencies needed for different test cases.
|
||||||
|
golang-test-all-deps: golang-fmt-deps golang-lint-deps golang-test-deps golang-vet-deps
|
||||||
|
|
||||||
|
# golang-test-all calls fmt, lint, vet and test on the specified pkg.
|
||||||
|
# arg1: pkg path
|
||||||
|
define golang-test-all
|
||||||
|
$(call golang-fmt,$(1))
|
||||||
|
$(call golang-lint,$(1))
|
||||||
|
$(call golang-vet,$(1))
|
||||||
|
$(call golang-test,$(1))
|
||||||
|
endef
|
||||||
|
|
||||||
|
# golang-test-all-strict-deps: installs all dependencies needed for different test cases.
|
||||||
|
golang-test-all-strict-deps: golang-fmt-deps golang-lint-deps-strict golang-test-strict-deps golang-vet-deps
|
||||||
|
|
||||||
|
# golang-test-all-strict calls fmt, lint, vet and test on the specified pkg with strict
|
||||||
|
# requirements that no errors are thrown while linting.
|
||||||
|
# arg1: pkg path
|
||||||
|
define golang-test-all-strict
|
||||||
|
$(call golang-fmt,$(1))
|
||||||
|
$(call golang-lint-strict,$(1))
|
||||||
|
$(call golang-vet,$(1))
|
||||||
|
$(call golang-test-strict,$(1))
|
||||||
|
endef
|
||||||
|
|
||||||
|
# golang-update-makefile downloads latest version of golang.mk
|
||||||
|
golang-update-makefile:
|
||||||
|
@wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang.mk -O /tmp/golang.mk 2>/dev/null
|
||||||
|
@if ! grep -q $(GOLANG_MK_VERSION) /tmp/golang.mk; then cp /tmp/golang.mk golang.mk && echo "golang.mk updated"; else echo "golang.mk is up-to-date"; fi
|
||||||
|
|
@ -16,7 +16,7 @@ func TestUnpacking(t *testing.T) {
|
||||||
decoded, err := b64.StdEncoding.DecodeString(input)
|
decoded, err := b64.StdEncoding.DecodeString(input)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
output, err := unpack(string(decoded))
|
output, err := unpack(decoded)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
expectedOutput := LogEventBatch{
|
expectedOutput := LogEventBatch{
|
||||||
|
|
@ -92,7 +92,7 @@ func TestFullLoop(t *testing.T) {
|
||||||
decoded, err := b64.StdEncoding.DecodeString(packed)
|
decoded, err := b64.StdEncoding.DecodeString(packed)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
output, err := unpack(string(decoded))
|
output, err := unpack(decoded)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, leb, output)
|
assert.Equal(t, leb, output)
|
||||||
|
|
@ -121,8 +121,8 @@ func TestSplit(t *testing.T) {
|
||||||
prodEnv := false
|
prodEnv := false
|
||||||
lines := Split(input, prodEnv)
|
lines := Split(input, prodEnv)
|
||||||
expected := [][]byte{
|
expected := [][]byte{
|
||||||
"2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line",
|
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line"),
|
||||||
"2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line",
|
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line"),
|
||||||
}
|
}
|
||||||
assert.Equal(t, expected, lines)
|
assert.Equal(t, expected, lines)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue