From 8d273d6a1e520f08c91bbdfa3e0d03e0c7569e32 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Tue, 18 Jul 2017 19:19:40 +0000 Subject: [PATCH] Fixed and added unit tests --- Makefile | 8 + batchconsumer/batcher/message_batcher.go | 1 + batchconsumer/batcher/message_batcher_test.go | 46 +++--- batchconsumer/consumer.go | 17 ++- batchconsumer/sender.go | 18 ++- batchconsumer/sync.go | 6 +- batchconsumer/writer.go | 22 +-- cmd/batchconsumer/main.go | 10 +- cmd/consumer/main.go | 16 +- golang.mk | 142 ++++++++++++++++++ splitter/splitter_test.go | 8 +- 11 files changed, 235 insertions(+), 59 deletions(-) create mode 100644 golang.mk diff --git a/Makefile b/Makefile index 42d8f1d..50eb390 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,10 @@ +include golang.mk +.DEFAULT_GOAL := test # override default goal set in library makefile + SHELL := /bin/bash JAR_DIR := jars PKG := github.com/Clever/amazon-kinesis-client-go +PKGS := $(shell go list ./... | grep -v /vendor ) .PHONY: download_jars run build URL_PREFIX := http://search.maven.org/remotecontent?filepath= @@ -49,3 +53,7 @@ run: build download_jars bench: go test -bench=. github.com/Clever/amazon-kinesis-client-go/decode/ + +test: $(PKGS) +$(PKGS): golang-test-all-deps + $(call golang-test-all,$@) diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go index 4ac1d54..244dbc3 100644 --- a/batchconsumer/batcher/message_batcher.go +++ b/batchconsumer/batcher/message_batcher.go @@ -7,6 +7,7 @@ import ( "time" ) +// SequencePair a convience way to pass around a Sequence / SubSequence pair type SequencePair struct { Sequence *big.Int SubSequence int diff --git a/batchconsumer/batcher/message_batcher_test.go b/batchconsumer/batcher/message_batcher_test.go index 488310d..5f804c0 100644 --- a/batchconsumer/batcher/message_batcher_test.go +++ b/batchconsumer/batcher/message_batcher_test.go @@ -37,8 +37,7 @@ func (m *MockSync) waitForFlush(timeout time.Duration) error { } } -const mockSequenceNumber = "99999" -const mockSubSequenceNumber = 12345 +var mockSequence = SequencePair{big.NewInt(99999), 12345} func TestBatchingByCount(t *testing.T) { var err error @@ -48,9 +47,9 @@ func TestBatchingByCount(t *testing.T) { batcher := New(sync, time.Hour, 2, 1024*1024) t.Log("Batcher respect count limit") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber)) - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber)) - assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) + assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) + assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequence)) err = sync.waitForFlush(time.Millisecond * 10) assert.NoError(err) @@ -73,7 +72,7 @@ func TestBatchingByTime(t *testing.T) { batcher := New(sync, time.Millisecond, 2000000, 1024*1024) t.Log("Batcher sends partial batches when time expires") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) err = sync.waitForFlush(time.Millisecond * 10) assert.NoError(err) @@ -83,8 +82,8 @@ func TestBatchingByTime(t *testing.T) { assert.Equal("hihi", string(sync.batches[0][0])) t.Log("Batcher sends all messsages in partial batches when time expires") - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber)) - assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) + assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) err = sync.waitForFlush(time.Millisecond * 10) assert.NoError(err) @@ -107,7 +106,7 @@ func TestBatchingBySize(t *testing.T) { batcher := New(sync, time.Hour, 2000000, 8) t.Log("Large messages are sent immediately") - assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequence)) err = sync.waitForFlush(time.Millisecond * 10) assert.NoError(err) @@ -117,8 +116,8 @@ func TestBatchingBySize(t *testing.T) { assert.Equal("hellohello", string(sync.batches[0][0])) t.Log("Batcher tries not to exceed size limit") - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber)) - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) + assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) err = sync.waitForFlush(time.Millisecond * 10) assert.NoError(err) @@ -128,7 +127,7 @@ func TestBatchingBySize(t *testing.T) { assert.Equal("heyhey", string(sync.batches[1][0])) t.Log("Batcher sends messages that didn't fit in previous batch") - assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequenceNumber, mockSubSequenceNumber)) // At this point "hihi" is in the batch + assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) // At this point "hihi" is in the batch err = sync.waitForFlush(time.Millisecond * 10) assert.NoError(err) @@ -139,7 +138,7 @@ func TestBatchingBySize(t *testing.T) { assert.Equal("yoyo", string(sync.batches[2][1])) t.Log("Batcher doesn't send partial batches") - assert.NoError(batcher.AddMessage([]byte("okok"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("okok"), mockSequence)) err = sync.waitForFlush(time.Millisecond * 10) assert.Error(err) @@ -153,7 +152,7 @@ func TestFlushing(t *testing.T) { batcher := New(sync, time.Hour, 2000000, 1024*1024) t.Log("Calling flush sends pending messages") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) err = sync.waitForFlush(time.Millisecond * 10) assert.Error(err) @@ -176,7 +175,7 @@ func TestSendingEmpty(t *testing.T) { batcher := New(sync, time.Second, 10, 1024*1024) t.Log("An error is returned when an empty message is sent") - err = batcher.AddMessage([]byte{}, mockSequenceNumber, mockSubSequenceNumber) + err = batcher.AddMessage([]byte{}, mockSequence) assert.Error(err) } @@ -187,26 +186,29 @@ func TestUpdatingSequence(t *testing.T) { batcher := New(sync, time.Second, 10, 1024*1024).(*batcher) t.Log("Initally, smallestSeq is undefined") + assert.Nil(batcher.SmallestSequencePair().Sequence) + expected := new(big.Int) - assert.Nil(batcher.smallestSeq.Sequence) t.Log("After AddMessage (seq=1), smallestSeq = 1") - assert.NoError(batcher.AddMessage([]byte("abab"), "1", mockSubSequenceNumber)) - sync.waitForFlush(time.Minute) + batcher.updateSequenceNumbers(SequencePair{big.NewInt(1), 1234}) expected.SetInt64(1) seq := batcher.SmallestSequencePair() assert.True(expected.Cmp(seq.Sequence) == 0) t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher") - assert.NoError(batcher.AddMessage([]byte("cdcd"), "2", mockSubSequenceNumber)) - sync.waitForFlush(time.Minute) + batcher.updateSequenceNumbers(SequencePair{big.NewInt(2), 1234}) seq = batcher.SmallestSequencePair() assert.True(expected.Cmp(seq.Sequence) == 0) t.Log("After AddMessage (seq=1), smallestSeq = 0") - assert.NoError(batcher.AddMessage([]byte("efef"), "0", mockSubSequenceNumber)) - sync.waitForFlush(time.Minute) + batcher.updateSequenceNumbers(SequencePair{big.NewInt(0), 1234}) expected.SetInt64(0) seq = batcher.SmallestSequencePair() assert.True(expected.Cmp(seq.Sequence) == 0) + + t.Log("Flushing batch clears smallest sequence pair") + assert.NoError(batcher.AddMessage([]byte("cdcd"), SequencePair{big.NewInt(2), 1234})) + sync.waitForFlush(time.Minute) + assert.Nil(batcher.SmallestSequencePair().Sequence) } diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 1218ab4..650f422 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -11,7 +11,11 @@ import ( "github.com/Clever/amazon-kinesis-client-go/kcl" ) +// Config used for BatchConsumer constructor. Any empty fields are populated with defaults. type Config struct { + // DeployEnv the name of the deployment environment + DeployEnv string + // LogFile where consumer errors and failed log lines are saved LogFile string // FlushInterval is how often accumulated messages should be bulk put to firehose @@ -21,15 +25,12 @@ type Config struct { // FlushSize is the size of a batch in bytes that triggers a push to firehose. Max batch size is 4Mb (4*1024*1024), see: http://docs.aws.amazon.com/firehose/latest/dev/limits.html FlushSize int - // DeployEnv the name of the deployment enviornment - DeployEnv string - // ReadRateLimit the number of records read per seconds ReadRateLimit int // ReadBurstLimit the max number of tokens allowed by rate limiter ReadBurstLimit int - // CheckpointFreq the frequence in which a checkpoint is saved + // CheckpointFreq the frequency in which a checkpoint is saved CheckpointFreq time.Duration // CheckpointRetries the number of times the consumer will try to save a checkpoint CheckpointRetries int @@ -37,6 +38,7 @@ type Config struct { CheckpointRetrySleep time.Duration } +// BatchConsumer is responsible for marshalling type BatchConsumer struct { kclProcess *kcl.KCLProcess logfile *os.File @@ -81,6 +83,8 @@ func withDefaults(config Config) Config { return config } +// NewBatchConsumerFromFiles creates a batch consumer. Readers/writers provided are used for +// interprocess communication. func NewBatchConsumerFromFiles( config Config, sender Sender, input io.Reader, output, errFile io.Writer, ) *BatchConsumer { @@ -94,7 +98,7 @@ func NewBatchConsumerFromFiles( kvlog := logger.New("amazon-kinesis-client-go") kvlog.SetOutput(file) - wrt := &BatchedWriter{ + wrt := &batchedWriter{ config: config, log: kvlog, sender: sender, @@ -107,10 +111,13 @@ func NewBatchConsumerFromFiles( } } +// NewBatchConsumer creates batch consumer. Stdin, Stdout, and Stderr are used for interprocess +// communication. func NewBatchConsumer(config Config, sender Sender) *BatchConsumer { return NewBatchConsumerFromFiles(config, sender, os.Stdin, os.Stdout, os.Stderr) } +// Start when called, the consumer begins ingesting messages. This function blocks. func (b *BatchConsumer) Start() { b.kclProcess.Run() b.logfile.Close() diff --git a/batchconsumer/sender.go b/batchconsumer/sender.go index 5ab6f53..c26be76 100644 --- a/batchconsumer/sender.go +++ b/batchconsumer/sender.go @@ -5,22 +5,38 @@ import ( "fmt" ) +// ErrLogIgnored should be returned by EncodeLog when it encounters a log line that will not be +// consumed var ErrLogIgnored = errors.New("Log intentionally skipped by sender") +// Sender an interface needed for batch consumer implementations type Sender interface { + // EncodeLog receives a raw log line. It's expected to return an appropriately formated log + // as well as a list of tags for that log line. A tag corresponds to a batch that it'll be + // put into. Typically tags are series names. + // If a log line will not be consumed, EncodeLog should return a ErrLogIgnored error. EncodeLog(rawlog []byte) (log []byte, tags []string, err error) + // SendBatch receives a batch of log lines. All log lines were given the specified tag by + // EncodeLog SendBatch(batch [][]byte, tag string) error } +// PartialOutputError should be returned by SendBatch implementations when a handful of log lines +// couldn't be sent to an output. It's expected that SendBatch implementations do a "best effort" +// before returning this error. type PartialOutputError struct { + // Message is a description of error that occurred Message string - Logs [][]byte + // Logs a list of logs that failed to be sent + Logs [][]byte } func (c PartialOutputError) Error() string { return fmt.Sprintf("%d failed logs. %s", len(c.Logs), c.Message) } +// CatastrophicOutputError should be returned by SendBatch implementations when the output is +// unreachable. Returning this error causes this container to exit without checkpointing. type CatastrophicOutputError struct { Message string } diff --git a/batchconsumer/sync.go b/batchconsumer/sync.go index cd77a59..517a943 100644 --- a/batchconsumer/sync.go +++ b/batchconsumer/sync.go @@ -1,11 +1,11 @@ package batchconsumer -type BatcherSync struct { +type batcherSync struct { tag string - writer *BatchedWriter + writer *batchedWriter } -func (b *BatcherSync) SendBatch(batch [][]byte) { +func (b *batcherSync) SendBatch(batch [][]byte) { b.writer.SendBatch(batch, b.tag) b.writer.CheckPointBatch(b.tag) } diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 17c129f..95bb7da 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -16,7 +16,7 @@ import ( "github.com/Clever/amazon-kinesis-client-go/splitter" ) -type BatchedWriter struct { +type batchedWriter struct { config Config sender Sender log kv.KayveeLogger @@ -31,7 +31,7 @@ type BatchedWriter struct { lastProcessedSeq batcher.SequencePair } -func (b *BatchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointer) error { +func (b *batchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointer) error { b.batchers = map[string]batcher.Batcher{} b.shardID = shardID b.checkpointChan = make(chan batcher.SequencePair) @@ -43,7 +43,7 @@ func (b *BatchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointe } // handleCheckpointError returns true if checkout should be tried again. Returns false otherwise. -func (b *BatchedWriter) handleCheckpointError(err error) bool { +func (b *batchedWriter) handleCheckpointError(err error) bool { if err == nil { return false } @@ -71,7 +71,7 @@ func (b *BatchedWriter) handleCheckpointError(err error) bool { return true } -func (b *BatchedWriter) startCheckpointListener( +func (b *batchedWriter) startCheckpointListener( checkpointer *kcl.Checkpointer, checkpointChan <-chan batcher.SequencePair, ) { lastCheckpoint := time.Now() @@ -113,15 +113,15 @@ func (b *BatchedWriter) startCheckpointListener( }() } -func (b *BatchedWriter) createBatcher(tag string) batcher.Batcher { - sync := &BatcherSync{ +func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { + sync := &batcherSync{ tag: tag, writer: b, } return batcher.New(sync, b.config.FlushInterval, b.config.FlushCount, b.config.FlushSize) } -func (b *BatchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { +func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { // We handle two types of records: // - records emitted from CWLogs Subscription // - records emiited from KPL @@ -134,7 +134,7 @@ func (b *BatchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production") } -func (b *BatchedWriter) ProcessRecords(records []kcl.Record) error { +func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { curSequence := b.lastProcessedSeq for _, record := range records { @@ -194,7 +194,7 @@ func (b *BatchedWriter) ProcessRecords(records []kcl.Record) error { return nil } -func (b *BatchedWriter) CheckPointBatch(tag string) { +func (b *batchedWriter) CheckPointBatch(tag string) { smallest := b.lastProcessedSeq for name, batch := range b.batchers { @@ -218,7 +218,7 @@ func (b *BatchedWriter) CheckPointBatch(tag string) { b.checkpointChan <- smallest } -func (b *BatchedWriter) SendBatch(batch [][]byte, tag string) { +func (b *batchedWriter) SendBatch(batch [][]byte, tag string) { b.log.Info("sent-batch") err := b.sender.SendBatch(batch, tag) switch e := err.(type) { @@ -237,7 +237,7 @@ func (b *BatchedWriter) SendBatch(batch [][]byte, tag string) { } } -func (b *BatchedWriter) Shutdown(reason string) error { +func (b *batchedWriter) Shutdown(reason string) error { if reason == "TERMINATE" { b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) for _, batch := range b.batchers { diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index 0d91b1f..0285139 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -35,16 +35,16 @@ func main() { output, file := createDummyOutput() defer file.Close() - wrt := &ExampleWriter{output: output} - consumer := kbc.NewBatchConsumer(config, wrt) + sender := &exampleSender{output: output} + consumer := kbc.NewBatchConsumer(config, sender) consumer.Start() } -type ExampleWriter struct { +type exampleSender struct { output logger.KayveeLogger } -func (e *ExampleWriter) EncodeLog(rawlog []byte) ([]byte, []string, error) { +func (e *exampleSender) EncodeLog(rawlog []byte) ([]byte, []string, error) { if len(rawlog)%5 == 2 { return nil, nil, kbc.ErrLogIgnored } @@ -55,7 +55,7 @@ func (e *ExampleWriter) EncodeLog(rawlog []byte) ([]byte, []string, error) { return []byte(line), []string{tag1}, nil } -func (e *ExampleWriter) SendBatch(batch [][]byte, tag string) error { +func (e *exampleSender) SendBatch(batch [][]byte, tag string) error { for idx, line := range batch { e.output.InfoD(tag, logger.M{"idx": idx, "line": string(line)}) } diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 22f8ccc..77a75c9 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -9,7 +9,7 @@ import ( "github.com/Clever/amazon-kinesis-client-go/kcl" ) -type SampleRecordProcessor struct { +type sampleRecordProcessor struct { checkpointer *kcl.Checkpointer checkpointRetries int checkpointFreq time.Duration @@ -18,25 +18,25 @@ type SampleRecordProcessor struct { lastCheckpoint time.Time } -func New() *SampleRecordProcessor { - return &SampleRecordProcessor{ +func newSampleRecordProcessor() *sampleRecordProcessor { + return &sampleRecordProcessor{ checkpointRetries: 5, checkpointFreq: 60 * time.Second, } } -func (srp *SampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.Checkpointer) error { +func (srp *sampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.Checkpointer) error { srp.lastCheckpoint = time.Now() srp.checkpointer = checkpointer return nil } -func (srp *SampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool { +func (srp *sampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool { return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 || (sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq) } -func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error { +func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { for _, record := range records { seqNumber := new(big.Int) if _, ok := seqNumber.SetString(record.SequenceNumber, 10); !ok { @@ -56,7 +56,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error { return nil } -func (srp *SampleRecordProcessor) Shutdown(reason string) error { +func (srp *sampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") srp.checkpointer.Shutdown() @@ -72,6 +72,6 @@ func main() { panic(err) } defer f.Close() - kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, New()) + kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, newSampleRecordProcessor()) kclProcess.Run() } diff --git a/golang.mk b/golang.mk new file mode 100644 index 0000000..602c777 --- /dev/null +++ b/golang.mk @@ -0,0 +1,142 @@ +# This is the default Clever Golang Makefile. +# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook +# Please do not alter this file directly. +GOLANG_MK_VERSION := 0.1.4 + +SHELL := /bin/bash +.PHONY: golang-godep-vendor golang-test-deps $(GODEP) + +# if the gopath includes several directories, use only the first +GOPATH=$(shell echo $$GOPATH | cut -d: -f1) + +# This block checks and confirms that the proper Go toolchain version is installed. +# arg1: golang version +define golang-version-check +GOVERSION := $(shell go version | grep $(1)) +_ := $(if \ + $(shell go version | grep $(1)), \ + @echo "", \ + $(error "must be running Go version $(1)")) +endef + +export GO15VENDOREXPERIMENT=1 + +# FGT is a utility that exits with 1 whenever any stderr/stdout output is recieved. +FGT := $(GOPATH)/bin/fgt +$(FGT): + go get github.com/GeertJohan/fgt + +# Godep is a tool used to manage Golang dependencies in the style of the Go 1.5 +# vendoring experiment. +GODEP := $(GOPATH)/bin/godep +$(GODEP): + go get -u github.com/tools/godep + +# Golint is a tool for linting Golang code for common errors. +GOLINT := $(GOPATH)/bin/golint +$(GOLINT): + go get github.com/golang/lint/golint + +# golang-vendor-deps installs all dependencies needed for different test cases. +golang-godep-vendor-deps: $(GODEP) + +# golang-godep-vendor is a target for saving dependencies with the godep tool +# to the vendor/ directory. All nested vendor/ directories are deleted as they +# are not handled well by the Go toolchain. +# arg1: pkg path +define golang-godep-vendor +$(GODEP) save $(1) +@# remove any nested vendor directories +find vendor/ -path '*/vendor' -type d | xargs -IX rm -r X +endef + +# golang-fmt-deps requires the FGT tool for checking output +golang-fmt-deps: $(FGT) + +# golang-fmt checks that all golang files in the pkg are formatted correctly. +# arg1: pkg path +define golang-fmt +@echo "FORMATTING $(1)..." +@$(FGT) gofmt -l=true $(GOPATH)/src/$(1)/*.go +endef + +# golang-lint-deps requires the golint tool for golang linting. +golang-lint-deps: $(GOLINT) + +# golang-lint calls golint on all golang files in the pkg. +# arg1: pkg path +define golang-lint +@echo "LINTING $(1)..." +@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(GOLINT) +endef + +# golang-lint-deps-strict requires the golint tool for golang linting. +golang-lint-deps-strict: $(GOLINT) $(FGT) + +# golang-lint-strict calls golint on all golang files in the pkg and fails if any lint +# errors are found. +# arg1: pkg path +define golang-lint-strict +@echo "LINTING $(1)..." +@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(FGT) $(GOLINT) +endef + +# golang-test-deps is here for consistency +golang-test-deps: + +# golang-test uses the Go toolchain to run all tests in the pkg. +# arg1: pkg path +define golang-test +@echo "TESTING $(1)..." +@go test -v $(1) +endef + +# golang-test-strict-deps is here for consistency +golang-test-strict-deps: + +# golang-test-strict uses the Go toolchain to run all tests in the pkg with the race flag +# arg1: pkg path +define golang-test-strict +@echo "TESTING $(1)..." +@go test -v -race $(1) +endef + +# golang-vet-deps is here for consistency +golang-vet-deps: + +# golang-vet uses the Go toolchain to vet all the pkg for common mistakes. +# arg1: pkg path +define golang-vet +@echo "VETTING $(1)..." +@go vet $(GOPATH)/src/$(1)/*.go +endef + +# golang-test-all-deps installs all dependencies needed for different test cases. +golang-test-all-deps: golang-fmt-deps golang-lint-deps golang-test-deps golang-vet-deps + +# golang-test-all calls fmt, lint, vet and test on the specified pkg. +# arg1: pkg path +define golang-test-all +$(call golang-fmt,$(1)) +$(call golang-lint,$(1)) +$(call golang-vet,$(1)) +$(call golang-test,$(1)) +endef + +# golang-test-all-strict-deps: installs all dependencies needed for different test cases. +golang-test-all-strict-deps: golang-fmt-deps golang-lint-deps-strict golang-test-strict-deps golang-vet-deps + +# golang-test-all-strict calls fmt, lint, vet and test on the specified pkg with strict +# requirements that no errors are thrown while linting. +# arg1: pkg path +define golang-test-all-strict +$(call golang-fmt,$(1)) +$(call golang-lint-strict,$(1)) +$(call golang-vet,$(1)) +$(call golang-test-strict,$(1)) +endef + +# golang-update-makefile downloads latest version of golang.mk +golang-update-makefile: + @wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang.mk -O /tmp/golang.mk 2>/dev/null + @if ! grep -q $(GOLANG_MK_VERSION) /tmp/golang.mk; then cp /tmp/golang.mk golang.mk && echo "golang.mk updated"; else echo "golang.mk is up-to-date"; fi diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index f85ebfa..d6c5cda 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -16,7 +16,7 @@ func TestUnpacking(t *testing.T) { decoded, err := b64.StdEncoding.DecodeString(input) assert.NoError(t, err) - output, err := unpack(string(decoded)) + output, err := unpack(decoded) assert.NoError(t, err) expectedOutput := LogEventBatch{ @@ -92,7 +92,7 @@ func TestFullLoop(t *testing.T) { decoded, err := b64.StdEncoding.DecodeString(packed) assert.NoError(t, err) - output, err := unpack(string(decoded)) + output, err := unpack(decoded) assert.NoError(t, err) assert.Equal(t, leb, output) @@ -121,8 +121,8 @@ func TestSplit(t *testing.T) { prodEnv := false lines := Split(input, prodEnv) expected := [][]byte{ - "2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line", - "2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line", + []byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line"), + []byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line"), } assert.Equal(t, expected, lines) }