Merge pull request #29 from Clever/initialize-method

Added initialize method to sender
This commit is contained in:
Xavi 2018-08-09 16:48:43 -07:00 committed by GitHub
commit 92ce9be607
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 42 additions and 8 deletions

View file

@ -11,6 +11,8 @@ var ErrMessageIgnored = errors.New("Message intentionally skipped by sender")
// Sender an interface needed for batch consumer implementations // Sender an interface needed for batch consumer implementations
type Sender interface { type Sender interface {
// Initialize called once before ProcessMessage and SendBatch
Initialize(shardID string)
// ProcessMessage receives a raw message and is expected to return an appropriately formatted // ProcessMessage receives a raw message and is expected to return an appropriately formatted
// message as well as a list of tags for that log line. A tag corresponds to a batch that // message as well as a list of tags for that log line. A tag corresponds to a batch that
// it'll be put into. Typically tags are series names. // it'll be put into. Typically tags are series names.

View file

@ -49,6 +49,8 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer
BatchInterval: b.config.BatchInterval, BatchInterval: b.config.BatchInterval,
} }
b.sender.Initialize(shardID)
b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq) b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq)
b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile) b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile)

View file

@ -14,6 +14,8 @@ import (
type ignoringSender struct{} type ignoringSender struct{}
func (i ignoringSender) Initialize(shardID string) {}
func (i ignoringSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) { func (i ignoringSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
return nil, nil, ErrMessageIgnored return nil, nil, ErrMessageIgnored
} }
@ -27,6 +29,7 @@ type tagBatch struct {
batch [][]byte batch [][]byte
} }
type msgAsTagSender struct { type msgAsTagSender struct {
shardID string
batches map[string][][][]byte batches map[string][][][]byte
saveBatch chan tagBatch saveBatch chan tagBatch
shutdown chan struct{} shutdown chan struct{}
@ -61,6 +64,10 @@ func (i *msgAsTagSender) startBatchSaver(saveBatch <-chan tagBatch, shutdown <-c
}() }()
} }
func (i *msgAsTagSender) Initialize(shardID string) {
i.shardID = shardID
}
func (i *msgAsTagSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) { func (i *msgAsTagSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
if "ignore" == string(rawmsg) { if "ignore" == string(rawmsg) {
return nil, nil, ErrMessageIgnored return nil, nil, ErrMessageIgnored
@ -176,6 +183,8 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) {
wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile)
wrt.Initialize("test-shard", mockcheckpointer) wrt.Initialize("test-shard", mockcheckpointer)
assert.Equal("test-shard", mocksender.shardID)
err := wrt.ProcessRecords([]kcl.Record{ err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("tag1")}, kcl.Record{SequenceNumber: "1", Data: encode("tag1")},
kcl.Record{SequenceNumber: "2", Data: encode("tag1")}, kcl.Record{SequenceNumber: "2", Data: encode("tag1")},

View file

@ -23,9 +23,14 @@ func main() {
} }
type exampleSender struct { type exampleSender struct {
shardID string
output logger.KayveeLogger output logger.KayveeLogger
} }
func (e *exampleSender) Initialize(shardID string) {
e.shardID = shardID
}
func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) { func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) {
if len(rawmsg)%5 == 2 { if len(rawmsg)%5 == 2 {
return nil, nil, kbc.ErrMessageIgnored return nil, nil, kbc.ErrMessageIgnored

View file

@ -1,12 +1,15 @@
# This is the default Clever Golang Makefile. # This is the default Clever Golang Makefile.
# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook # It is stored in the dev-handbook repo, github.com/Clever/dev-handbook
# Please do not alter this file directly. # Please do not alter this file directly.
GOLANG_MK_VERSION := 0.3.2 GOLANG_MK_VERSION := 0.3.8
SHELL := /bin/bash SHELL := /bin/bash
SYSTEM := $(shell uname -a | cut -d" " -f1 | tr '[:upper:]' '[:lower:]') SYSTEM := $(shell uname -a | cut -d" " -f1 | tr '[:upper:]' '[:lower:]')
.PHONY: golang-test-deps bin/dep golang-ensure-curl-installed .PHONY: golang-test-deps bin/dep golang-ensure-curl-installed
# set timezone to UTC for golang to match circle and deploys
export TZ=UTC
# if the gopath includes several directories, use only the first # if the gopath includes several directories, use only the first
GOPATH=$(shell echo $$GOPATH | cut -d: -f1) GOPATH=$(shell echo $$GOPATH | cut -d: -f1)
@ -36,18 +39,31 @@ $(FGT):
golang-ensure-curl-installed: golang-ensure-curl-installed:
@command -v curl >/dev/null 2>&1 || { echo >&2 "curl not installed. Please install curl."; exit 1; } @command -v curl >/dev/null 2>&1 || { echo >&2 "curl not installed. Please install curl."; exit 1; }
DEP_VERSION = v0.3.2 DEP_VERSION = v0.4.1
DEP_INSTALLED := $(shell [[ -e "bin/dep" ]] && bin/dep version | grep version | grep -v go | cut -d: -f2 | tr -d '[:space:]') DEP_INSTALLED := $(shell [[ -e "bin/dep" ]] && bin/dep version | grep version | grep -v go | cut -d: -f2 | tr -d '[:space:]')
# Dep is a tool used to manage Golang dependencies. It is the offical vendoring experiment, but # Dep is a tool used to manage Golang dependencies. It is the offical vendoring experiment, but
# not yet the official tool for Golang. # not yet the official tool for Golang.
ifeq ($(DEP_VERSION),$(DEP_INSTALLED))
bin/dep: # nothing to do, dep is already up-to-date
else
CACHED_DEP = /tmp/dep-$(DEP_VERSION)
bin/dep: golang-ensure-curl-installed bin/dep: golang-ensure-curl-installed
@echo "Updating dep..."
@mkdir -p bin @mkdir -p bin
@[[ "$(DEP_VERSION)" != "$(DEP_INSTALLED)" ]] && \ @if [ ! -f $(CACHED_DEP) ]; then curl -o $(CACHED_DEP) -sL https://github.com/golang/dep/releases/download/$(DEP_VERSION)/dep-$(SYSTEM)-amd64; fi;
echo "Updating dep..." && \ @cp $(CACHED_DEP) bin/dep
curl -o bin/dep -sL https://github.com/golang/dep/releases/download/$(DEP_VERSION)/dep-$(SYSTEM)-amd64 && \ @chmod +x bin/dep || true
chmod +x bin/dep || true endif
golang-dep-vendor-deps: bin/dep # figure out "github.com/<org>/<repo>"
# `go list` will fail if there are no .go files in the directory
# if this is the case, fall back to assuming github.com/Clever
REF = $(shell go list || echo github.com/Clever/$(notdir $(shell pwd)))
golang-verify-no-self-references:
@if grep -q -i "$(REF)" Gopkg.lock; then echo "Error: Gopkg.lock includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi;
@if grep -q -i "$(REF)" Gopkg.toml; then echo "Error: Gopkg.toml includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi;
golang-dep-vendor-deps: bin/dep golang-verify-no-self-references
# golang-godep-vendor is a target for saving dependencies with the dep tool # golang-godep-vendor is a target for saving dependencies with the dep tool
# to the vendor/ directory. All nested vendor/ directories are deleted via # to the vendor/ directory. All nested vendor/ directories are deleted via