diff --git a/batchconsumer/sender.go b/batchconsumer/sender.go index 03e0421..ed90ad6 100644 --- a/batchconsumer/sender.go +++ b/batchconsumer/sender.go @@ -11,6 +11,8 @@ var ErrMessageIgnored = errors.New("Message intentionally skipped by sender") // Sender an interface needed for batch consumer implementations type Sender interface { + // Initialize called once before ProcessMessage and SendBatch + Initialize(shardID string) // ProcessMessage receives a raw message and is expected to return an appropriately formatted // message as well as a list of tags for that log line. A tag corresponds to a batch that // it'll be put into. Typically tags are series names. diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index cf5eab4..4fe8c9b 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -49,6 +49,8 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer BatchInterval: b.config.BatchInterval, } + b.sender.Initialize(shardID) + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq) b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile) diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 7bc7e33..8de24a9 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -14,6 +14,8 @@ import ( type ignoringSender struct{} +func (i ignoringSender) Initialize(shardID string) {} + func (i ignoringSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) { return nil, nil, ErrMessageIgnored } @@ -27,6 +29,7 @@ type tagBatch struct { batch [][]byte } type msgAsTagSender struct { + shardID string batches map[string][][][]byte saveBatch chan tagBatch shutdown chan struct{} @@ -61,6 +64,10 @@ func (i *msgAsTagSender) startBatchSaver(saveBatch <-chan tagBatch, shutdown <-c }() } +func (i *msgAsTagSender) Initialize(shardID string) { + i.shardID = shardID +} + func (i *msgAsTagSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) { if "ignore" == string(rawmsg) { return nil, nil, ErrMessageIgnored @@ -176,6 +183,8 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) + assert.Equal("test-shard", mocksender.shardID) + err := wrt.ProcessRecords([]kcl.Record{ kcl.Record{SequenceNumber: "1", Data: encode("tag1")}, kcl.Record{SequenceNumber: "2", Data: encode("tag1")}, diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index 004ec36..ff7c1d4 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -23,7 +23,12 @@ func main() { } type exampleSender struct { - output logger.KayveeLogger + shardID string + output logger.KayveeLogger +} + +func (e *exampleSender) Initialize(shardID string) { + e.shardID = shardID } func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) { diff --git a/golang.mk b/golang.mk index 7d8ba4c..b539763 100644 --- a/golang.mk +++ b/golang.mk @@ -1,12 +1,15 @@ # This is the default Clever Golang Makefile. # It is stored in the dev-handbook repo, github.com/Clever/dev-handbook # Please do not alter this file directly. -GOLANG_MK_VERSION := 0.3.2 +GOLANG_MK_VERSION := 0.3.8 SHELL := /bin/bash SYSTEM := $(shell uname -a | cut -d" " -f1 | tr '[:upper:]' '[:lower:]') .PHONY: golang-test-deps bin/dep golang-ensure-curl-installed +# set timezone to UTC for golang to match circle and deploys +export TZ=UTC + # if the gopath includes several directories, use only the first GOPATH=$(shell echo $$GOPATH | cut -d: -f1) @@ -36,18 +39,31 @@ $(FGT): golang-ensure-curl-installed: @command -v curl >/dev/null 2>&1 || { echo >&2 "curl not installed. Please install curl."; exit 1; } -DEP_VERSION = v0.3.2 +DEP_VERSION = v0.4.1 DEP_INSTALLED := $(shell [[ -e "bin/dep" ]] && bin/dep version | grep version | grep -v go | cut -d: -f2 | tr -d '[:space:]') # Dep is a tool used to manage Golang dependencies. It is the offical vendoring experiment, but # not yet the official tool for Golang. +ifeq ($(DEP_VERSION),$(DEP_INSTALLED)) +bin/dep: # nothing to do, dep is already up-to-date +else +CACHED_DEP = /tmp/dep-$(DEP_VERSION) bin/dep: golang-ensure-curl-installed + @echo "Updating dep..." @mkdir -p bin - @[[ "$(DEP_VERSION)" != "$(DEP_INSTALLED)" ]] && \ - echo "Updating dep..." && \ - curl -o bin/dep -sL https://github.com/golang/dep/releases/download/$(DEP_VERSION)/dep-$(SYSTEM)-amd64 && \ - chmod +x bin/dep || true + @if [ ! -f $(CACHED_DEP) ]; then curl -o $(CACHED_DEP) -sL https://github.com/golang/dep/releases/download/$(DEP_VERSION)/dep-$(SYSTEM)-amd64; fi; + @cp $(CACHED_DEP) bin/dep + @chmod +x bin/dep || true +endif -golang-dep-vendor-deps: bin/dep +# figure out "github.com//" +# `go list` will fail if there are no .go files in the directory +# if this is the case, fall back to assuming github.com/Clever +REF = $(shell go list || echo github.com/Clever/$(notdir $(shell pwd))) +golang-verify-no-self-references: + @if grep -q -i "$(REF)" Gopkg.lock; then echo "Error: Gopkg.lock includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi; + @if grep -q -i "$(REF)" Gopkg.toml; then echo "Error: Gopkg.toml includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi; + +golang-dep-vendor-deps: bin/dep golang-verify-no-self-references # golang-godep-vendor is a target for saving dependencies with the dep tool # to the vendor/ directory. All nested vendor/ directories are deleted via