Merge pull request #6 from Clever/INFRA-2405-new-consumer-interface
Made batchedconsumer interface production ready
This commit is contained in:
commit
16814bd597
18 changed files with 748 additions and 894 deletions
66
Makefile
66
Makefile
|
|
@ -2,52 +2,52 @@ include golang.mk
|
||||||
.DEFAULT_GOAL := test # override default goal set in library makefile
|
.DEFAULT_GOAL := test # override default goal set in library makefile
|
||||||
|
|
||||||
SHELL := /bin/bash
|
SHELL := /bin/bash
|
||||||
JAR_DIR := jars
|
|
||||||
PKG := github.com/Clever/amazon-kinesis-client-go
|
PKG := github.com/Clever/amazon-kinesis-client-go
|
||||||
PKGS := $(shell go list ./... | grep -v /vendor )
|
PKGS := $(shell go list ./... | grep -v /vendor )
|
||||||
.PHONY: download_jars run build
|
.PHONY: download_jars run build
|
||||||
|
$(eval $(call golang-version-check,1.8))
|
||||||
URL_PREFIX := http://search.maven.org/remotecontent?filepath=
|
|
||||||
|
|
||||||
# this list lifted from https://github.com/awslabs/amazon-kinesis-client-python/blob/fb49c6390c0593fbcf81d6c34c5245726c15b2f3/setup.py#L60
|
|
||||||
JARS_TO_DOWNLOAD := $(addprefix $(JAR_DIR)/,com/amazonaws/amazon-kinesis-client/1.7.2/amazon-kinesis-client-1.7.2.jar \
|
|
||||||
com/amazonaws/aws-java-sdk-dynamodb/1.11.14/aws-java-sdk-dynamodb-1.11.14.jar \
|
|
||||||
com/amazonaws/aws-java-sdk-s3/1.11.14/aws-java-sdk-s3-1.11.14.jar \
|
|
||||||
com/amazonaws/aws-java-sdk-kms/1.11.14/aws-java-sdk-kms-1.11.14.jar \
|
|
||||||
com/amazonaws/aws-java-sdk-core/1.11.14/aws-java-sdk-core-1.11.14.jar \
|
|
||||||
commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar \
|
|
||||||
org/apache/httpcomponents/httpclient/4.5.2/httpclient-4.5.2.jar \
|
|
||||||
org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar \
|
|
||||||
commons-codec/commons-codec/1.9/commons-codec-1.9.jar \
|
|
||||||
com/fasterxml/jackson/core/jackson-databind/2.6.6/jackson-databind-2.6.6.jar \
|
|
||||||
com/fasterxml/jackson/core/jackson-annotations/2.6.0/jackson-annotations-2.6.0.jar \
|
|
||||||
com/fasterxml/jackson/core/jackson-core/2.6.6/jackson-core-2.6.6.jar \
|
|
||||||
com/fasterxml/jackson/dataformat/jackson-dataformat-cbor/2.6.6/jackson-dataformat-cbor-2.6.6.jar \
|
|
||||||
joda-time/joda-time/2.8.1/joda-time-2.8.1.jar \
|
|
||||||
com/amazonaws/aws-java-sdk-kinesis/1.11.14/aws-java-sdk-kinesis-1.11.14.jar \
|
|
||||||
com/amazonaws/aws-java-sdk-cloudwatch/1.11.14/aws-java-sdk-cloudwatch-1.11.14.jar \
|
|
||||||
com/google/guava/guava/18.0/guava-18.0.jar \
|
|
||||||
com/google/protobuf/protobuf-java/2.6.1/protobuf-java-2.6.1.jar \
|
|
||||||
commons-lang/commons-lang/2.6/commons-lang-2.6.jar)
|
|
||||||
|
|
||||||
EMPTY :=
|
|
||||||
SPACE := $(EMPTY) $(EMPTY)
|
|
||||||
JAVA_CLASS_PATH := $(subst $(SPACE),:,$(JARS_TO_DOWNLOAD))
|
|
||||||
|
|
||||||
CONSUMER ?= consumer
|
CONSUMER ?= consumer
|
||||||
|
TMP_DIR := ./tmp-jars
|
||||||
|
JAR_DIR := ./jars/
|
||||||
|
KCL_VERSION := 1.7.6
|
||||||
|
|
||||||
$(JARS_TO_DOWNLOAD):
|
define POM_XML_FOR_GETTING_DEPENDENT_JARS
|
||||||
mkdir -p `dirname $@`
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
curl -s -L -o $@ -O $(URL_PREFIX)`echo $@ | sed 's/$(JAR_DIR)\///g'`
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<groupId>com.clever.kinesisconsumers</groupId>
|
||||||
|
<artifactId>$(CONSUMER)</artifactId>
|
||||||
|
<version>1.0-SNAPSHOT</version>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.amazonaws</groupId>
|
||||||
|
<artifactId>amazon-kinesis-client</artifactId>
|
||||||
|
<version>$(KCL_VERSION)</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
||||||
|
endef
|
||||||
|
export POM_XML_FOR_GETTING_DEPENDENT_JARS
|
||||||
|
download_jars:
|
||||||
|
command -v mvn >/dev/null 2>&1 || { echo >&2 "Maven not installed. Install maven!"; exit 1; }
|
||||||
|
mkdir -p $(JAR_DIR) $(TMP_DIR)
|
||||||
|
echo $$POM_XML_FOR_GETTING_DEPENDENT_JARS > $(TMP_DIR)/pom.xml
|
||||||
|
cd $(TMP_DIR) && mvn dependency:copy-dependencies
|
||||||
|
mv $(TMP_DIR)/target/dependency/* $(JAR_DIR)/
|
||||||
|
# Download the STS jar file for supporting IAM Roles
|
||||||
|
ls $(JAR_DIR)/aws-java-sdk-core-*.jar | sed -e "s/.*-sdk-core-//g" | sed -e "s/\.jar//g" > /tmp/version.txt
|
||||||
|
curl -o $(JAR_DIR)/aws-java-sdk-sts-`cat /tmp/version.txt`.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-sts/`cat /tmp/version.txt`/aws-java-sdk-sts-`cat /tmp/version.txt`.jar
|
||||||
|
rm -r $(TMP_DIR)
|
||||||
|
|
||||||
download_jars: $(JARS_TO_DOWNLOAD)
|
all: test build
|
||||||
|
|
||||||
build:
|
build:
|
||||||
CGO_ENABLED=0 go build -installsuffix cgo -o build/$(CONSUMER) $(PKG)/cmd/$(CONSUMER)
|
CGO_ENABLED=0 go build -installsuffix cgo -o build/$(CONSUMER) $(PKG)/cmd/$(CONSUMER)
|
||||||
|
|
||||||
run: build download_jars
|
run: build download_jars
|
||||||
command -v java >/dev/null 2>&1 || { echo >&2 "Java not installed. Install java!"; exit 1; }
|
command -v java >/dev/null 2>&1 || { echo >&2 "Java not installed. Install java!"; exit 1; }
|
||||||
java -cp $(JAVA_CLASS_PATH) \
|
java -cp "$(JAR_DIR)/*" \
|
||||||
com.amazonaws.services.kinesis.multilang.MultiLangDaemon \
|
com.amazonaws.services.kinesis.multilang.MultiLangDaemon \
|
||||||
$(CONSUMER).properties
|
$(CONSUMER).properties
|
||||||
|
|
||||||
|
|
|
||||||
53
batchconsumer/batcher.go
Normal file
53
batchconsumer/batcher.go
Normal file
|
|
@ -0,0 +1,53 @@
|
||||||
|
package batchconsumer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrBatchFull = errors.New("The batch is full")
|
||||||
|
|
||||||
|
type batcher struct {
|
||||||
|
flushCount int
|
||||||
|
flushSize int
|
||||||
|
|
||||||
|
Batch [][]byte
|
||||||
|
LastUpdated time.Time
|
||||||
|
SmallestSeq kcl.SequencePair
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcher) batchSize(batch [][]byte) int {
|
||||||
|
total := 0
|
||||||
|
for _, msg := range batch {
|
||||||
|
total += len(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return total
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error {
|
||||||
|
if b.flushCount <= len(b.Batch) {
|
||||||
|
return ErrBatchFull
|
||||||
|
}
|
||||||
|
|
||||||
|
size := b.batchSize(b.Batch)
|
||||||
|
if b.flushSize < size+len(msg) {
|
||||||
|
return ErrBatchFull
|
||||||
|
}
|
||||||
|
|
||||||
|
b.Batch = append(b.Batch, msg)
|
||||||
|
if b.SmallestSeq.IsNil() || pair.IsLessThan(b.SmallestSeq) {
|
||||||
|
b.SmallestSeq = pair
|
||||||
|
}
|
||||||
|
b.LastUpdated = time.Now()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcher) Clear() {
|
||||||
|
b.Batch = [][]byte{}
|
||||||
|
b.LastUpdated = time.Time{}
|
||||||
|
b.SmallestSeq = kcl.SequencePair{}
|
||||||
|
}
|
||||||
|
|
@ -1,194 +0,0 @@
|
||||||
package batcher
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"math/big"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// SequencePair a convience way to pass around a Sequence / SubSequence pair
|
|
||||||
type SequencePair struct {
|
|
||||||
Sequence *big.Int
|
|
||||||
SubSequence int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s SequencePair) IsEmpty() bool {
|
|
||||||
return s.Sequence == nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s SequencePair) IsLessThan(pair SequencePair) bool {
|
|
||||||
if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
cmp := s.Sequence.Cmp(pair.Sequence)
|
|
||||||
if cmp == -1 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if cmp == 1 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.SubSequence < pair.SubSequence
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync is used to allow a writer to syncronize with the batcher.
|
|
||||||
// The writer declares how to write messages (via its `SendBatch` method), while the batcher
|
|
||||||
// keeps track of messages written
|
|
||||||
type Sync interface {
|
|
||||||
SendBatch(batch [][]byte)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Batcher interface
|
|
||||||
type Batcher interface {
|
|
||||||
// AddMesage to the batch
|
|
||||||
AddMessage(msg []byte, sequencePair SequencePair) error
|
|
||||||
// Flush all messages from the batch
|
|
||||||
Flush()
|
|
||||||
// SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in
|
|
||||||
// the current batch
|
|
||||||
SmallestSequencePair() SequencePair
|
|
||||||
}
|
|
||||||
|
|
||||||
type msgPack struct {
|
|
||||||
msg []byte
|
|
||||||
sequencePair SequencePair
|
|
||||||
}
|
|
||||||
|
|
||||||
type batcher struct {
|
|
||||||
mux sync.Mutex
|
|
||||||
|
|
||||||
flushInterval time.Duration
|
|
||||||
flushCount int
|
|
||||||
flushSize int
|
|
||||||
|
|
||||||
// smallestSeq are used for checkpointing
|
|
||||||
smallestSeq SequencePair
|
|
||||||
|
|
||||||
sync Sync
|
|
||||||
msgChan chan<- msgPack
|
|
||||||
flushChan chan<- struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates a new Batcher
|
|
||||||
// - sync - synchronizes batcher with writer
|
|
||||||
// - flushInterval - how often accumulated messages should be flushed (default 1 second).
|
|
||||||
// - flushCount - number of messages that trigger a flush (default 10).
|
|
||||||
// - flushSize - size of batch that triggers a flush (default 1024 * 1024 = 1 mb)
|
|
||||||
func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) (Batcher, error) {
|
|
||||||
if flushSize == 0 {
|
|
||||||
return nil, fmt.Errorf("flush size must be non-zero")
|
|
||||||
}
|
|
||||||
if flushCount == 0 {
|
|
||||||
return nil, fmt.Errorf("flush count must be non-zero")
|
|
||||||
}
|
|
||||||
if flushInterval == 0 {
|
|
||||||
return nil, fmt.Errorf("flush interval must be non-zero")
|
|
||||||
}
|
|
||||||
|
|
||||||
msgChan := make(chan msgPack)
|
|
||||||
flushChan := make(chan struct{})
|
|
||||||
|
|
||||||
b := &batcher{
|
|
||||||
flushCount: flushCount,
|
|
||||||
flushInterval: flushInterval,
|
|
||||||
flushSize: flushSize,
|
|
||||||
sync: sync,
|
|
||||||
msgChan: msgChan,
|
|
||||||
flushChan: flushChan,
|
|
||||||
}
|
|
||||||
|
|
||||||
go b.startBatcher(msgChan, flushChan)
|
|
||||||
|
|
||||||
return b, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcher) SmallestSequencePair() SequencePair {
|
|
||||||
b.mux.Lock()
|
|
||||||
defer b.mux.Unlock()
|
|
||||||
|
|
||||||
return b.smallestSeq
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcher) SetFlushInterval(dur time.Duration) {
|
|
||||||
b.flushInterval = dur
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcher) SetFlushCount(count int) {
|
|
||||||
b.flushCount = count
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcher) SetFlushSize(size int) {
|
|
||||||
b.flushSize = size
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcher) AddMessage(msg []byte, pair SequencePair) error {
|
|
||||||
if len(msg) <= 0 {
|
|
||||||
return fmt.Errorf("Empty messages can't be sent")
|
|
||||||
}
|
|
||||||
|
|
||||||
b.msgChan <- msgPack{msg, pair}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch.
|
|
||||||
// When flush() is called, the batcher sends the sequence number to the writer. When the writer
|
|
||||||
// checkpoints, it does so up to the latest message that was flushed successfully.
|
|
||||||
func (b *batcher) updateSequenceNumbers(pair SequencePair) {
|
|
||||||
b.mux.Lock()
|
|
||||||
defer b.mux.Unlock()
|
|
||||||
|
|
||||||
if b.smallestSeq.IsEmpty() || pair.IsLessThan(b.smallestSeq) {
|
|
||||||
b.smallestSeq = pair
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcher) Flush() {
|
|
||||||
b.flushChan <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcher) batchSize(batch [][]byte) int {
|
|
||||||
total := 0
|
|
||||||
for _, msg := range batch {
|
|
||||||
total += len(msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
return total
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcher) flush(batch [][]byte) [][]byte {
|
|
||||||
if len(batch) > 0 {
|
|
||||||
b.sync.SendBatch(batch)
|
|
||||||
|
|
||||||
b.mux.Lock()
|
|
||||||
b.smallestSeq = SequencePair{nil, 0}
|
|
||||||
b.mux.Unlock()
|
|
||||||
}
|
|
||||||
return [][]byte{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcher) startBatcher(msgChan <-chan msgPack, flushChan <-chan struct{}) {
|
|
||||||
batch := [][]byte{}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-time.After(b.flushInterval):
|
|
||||||
batch = b.flush(batch)
|
|
||||||
case <-flushChan:
|
|
||||||
batch = b.flush(batch)
|
|
||||||
case pack := <-msgChan:
|
|
||||||
size := b.batchSize(batch)
|
|
||||||
if b.flushSize < size+len(pack.msg) {
|
|
||||||
batch = b.flush(batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
batch = append(batch, pack.msg)
|
|
||||||
b.updateSequenceNumbers(pack.sequencePair)
|
|
||||||
|
|
||||||
if b.flushCount <= len(batch) || b.flushSize <= b.batchSize(batch) {
|
|
||||||
batch = b.flush(batch)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,264 +0,0 @@
|
||||||
package batcher
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"math/big"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
type batch [][]byte
|
|
||||||
|
|
||||||
type MockSync struct {
|
|
||||||
flushChan chan struct{}
|
|
||||||
batches []batch
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMockSync() *MockSync {
|
|
||||||
return &MockSync{
|
|
||||||
flushChan: make(chan struct{}, 1),
|
|
||||||
batches: []batch{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockSync) SendBatch(b [][]byte) {
|
|
||||||
m.batches = append(m.batches, batch(b))
|
|
||||||
m.flushChan <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockSync) waitForFlush(timeout time.Duration) error {
|
|
||||||
select {
|
|
||||||
case <-m.flushChan:
|
|
||||||
return nil
|
|
||||||
case <-time.After(timeout):
|
|
||||||
return fmt.Errorf("timed out before flush (waited %s)", timeout.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var mockSequence = SequencePair{big.NewInt(99999), 12345}
|
|
||||||
|
|
||||||
func TestBatchingByCount(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
sync := NewMockSync()
|
|
||||||
batcher, err := New(sync, time.Hour, 2, 1024*1024)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
t.Log("Batcher respect count limit")
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence))
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequence))
|
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
assert.Equal(1, len(sync.batches))
|
|
||||||
assert.Equal(2, len(sync.batches[0]))
|
|
||||||
assert.Equal("hihi", string(sync.batches[0][0]))
|
|
||||||
assert.Equal("heyhey", string(sync.batches[0][1]))
|
|
||||||
|
|
||||||
t.Log("Batcher doesn't send partial batches")
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchingByTime(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
sync := NewMockSync()
|
|
||||||
batcher, err := New(sync, time.Millisecond, 2000000, 1024*1024)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
t.Log("Batcher sends partial batches when time expires")
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
|
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
assert.Equal(1, len(sync.batches))
|
|
||||||
assert.Equal(1, len(sync.batches[0]))
|
|
||||||
assert.Equal("hihi", string(sync.batches[0][0]))
|
|
||||||
|
|
||||||
t.Log("Batcher sends all messsages in partial batches when time expires")
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence))
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence))
|
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
assert.Equal(2, len(sync.batches))
|
|
||||||
assert.Equal(2, len(sync.batches[1]))
|
|
||||||
assert.Equal("heyhey", string(sync.batches[1][0]))
|
|
||||||
assert.Equal("yoyo", string(sync.batches[1][1]))
|
|
||||||
|
|
||||||
t.Log("Batcher doesn't send empty batches")
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchingBySize(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
sync := NewMockSync()
|
|
||||||
batcher, err := New(sync, time.Hour, 2000000, 8)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
t.Log("Large messages are sent immediately")
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequence))
|
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
assert.Equal(1, len(sync.batches))
|
|
||||||
assert.Equal(1, len(sync.batches[0]))
|
|
||||||
assert.Equal("hellohello", string(sync.batches[0][0]))
|
|
||||||
|
|
||||||
t.Log("Batcher tries not to exceed size limit")
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence))
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
|
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
assert.Equal(2, len(sync.batches))
|
|
||||||
assert.Equal(1, len(sync.batches[1]))
|
|
||||||
assert.Equal("heyhey", string(sync.batches[1][0]))
|
|
||||||
|
|
||||||
t.Log("Batcher sends messages that didn't fit in previous batch")
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) // At this point "hihi" is in the batch
|
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
assert.Equal(3, len(sync.batches))
|
|
||||||
assert.Equal(2, len(sync.batches[2]))
|
|
||||||
assert.Equal("hihi", string(sync.batches[2][0]))
|
|
||||||
assert.Equal("yoyo", string(sync.batches[2][1]))
|
|
||||||
|
|
||||||
t.Log("Batcher doesn't send partial batches")
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("okok"), mockSequence))
|
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFlushing(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
sync := NewMockSync()
|
|
||||||
batcher, err := New(sync, time.Hour, 2000000, 1024*1024)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
t.Log("Calling flush sends pending messages")
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
|
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.Error(err)
|
|
||||||
|
|
||||||
batcher.Flush()
|
|
||||||
|
|
||||||
err = sync.waitForFlush(time.Millisecond * 10)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
assert.Equal(1, len(sync.batches))
|
|
||||||
assert.Equal(1, len(sync.batches[0]))
|
|
||||||
assert.Equal("hihi", string(sync.batches[0][0]))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSendingEmpty(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
sync := NewMockSync()
|
|
||||||
batcher, err := New(sync, time.Second, 10, 1024*1024)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
t.Log("An error is returned when an empty message is sent")
|
|
||||||
err = batcher.AddMessage([]byte{}, mockSequence)
|
|
||||||
assert.Error(err)
|
|
||||||
assert.Equal(err.Error(), "Empty messages can't be sent")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestUpdatingSequence(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
sync := NewMockSync()
|
|
||||||
b, err := New(sync, time.Second, 10, 1024*1024)
|
|
||||||
assert.NoError(err)
|
|
||||||
|
|
||||||
batcher := b.(*batcher)
|
|
||||||
|
|
||||||
t.Log("Initally, smallestSeq is undefined")
|
|
||||||
assert.Nil(batcher.SmallestSequencePair().Sequence)
|
|
||||||
|
|
||||||
expected := new(big.Int)
|
|
||||||
|
|
||||||
t.Log("After AddMessage (seq=1), smallestSeq = 1")
|
|
||||||
batcher.updateSequenceNumbers(SequencePair{big.NewInt(1), 1234})
|
|
||||||
expected.SetInt64(1)
|
|
||||||
seq := batcher.SmallestSequencePair()
|
|
||||||
assert.True(expected.Cmp(seq.Sequence) == 0)
|
|
||||||
|
|
||||||
t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher")
|
|
||||||
batcher.updateSequenceNumbers(SequencePair{big.NewInt(2), 1234})
|
|
||||||
seq = batcher.SmallestSequencePair()
|
|
||||||
assert.True(expected.Cmp(seq.Sequence) == 0)
|
|
||||||
|
|
||||||
t.Log("After AddMessage (seq=1), smallestSeq = 0")
|
|
||||||
batcher.updateSequenceNumbers(SequencePair{big.NewInt(0), 1234})
|
|
||||||
expected.SetInt64(0)
|
|
||||||
seq = batcher.SmallestSequencePair()
|
|
||||||
assert.True(expected.Cmp(seq.Sequence) == 0)
|
|
||||||
|
|
||||||
t.Log("Flushing batch clears smallest sequence pair")
|
|
||||||
assert.NoError(batcher.AddMessage([]byte("cdcd"), SequencePair{big.NewInt(2), 1234}))
|
|
||||||
sync.waitForFlush(time.Minute)
|
|
||||||
assert.Nil(batcher.SmallestSequencePair().Sequence)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSequencePairIsLessThan(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
big10 := big.NewInt(10)
|
|
||||||
big5 := big.NewInt(5)
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
left SequencePair
|
|
||||||
right SequencePair
|
|
||||||
isLess bool
|
|
||||||
}{
|
|
||||||
{left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false},
|
|
||||||
{left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false},
|
|
||||||
{left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false},
|
|
||||||
|
|
||||||
{left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true},
|
|
||||||
{left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true},
|
|
||||||
|
|
||||||
{left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false},
|
|
||||||
{left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
left := test.left
|
|
||||||
right := test.right
|
|
||||||
t.Logf(
|
|
||||||
"Is <%s, %d> less than <%s, %d>? %t",
|
|
||||||
left.Sequence.String(), left.SubSequence,
|
|
||||||
right.Sequence.String(), right.SubSequence,
|
|
||||||
test.isLess,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert.Equal(test.isLess, left.IsLessThan(right))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSequencePairEmpty(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
assert.True(SequencePair{nil, 0}.IsEmpty())
|
|
||||||
assert.True(SequencePair{nil, 10000}.IsEmpty())
|
|
||||||
|
|
||||||
assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty())
|
|
||||||
assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty())
|
|
||||||
}
|
|
||||||
220
batchconsumer/batchermanager.go
Normal file
220
batchconsumer/batchermanager.go
Normal file
|
|
@ -0,0 +1,220 @@
|
||||||
|
package batchconsumer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
kv "gopkg.in/Clever/kayvee-go.v6/logger"
|
||||||
|
|
||||||
|
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
|
||||||
|
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
||||||
|
)
|
||||||
|
|
||||||
|
type tagMsgPair struct {
|
||||||
|
tag string
|
||||||
|
msg []byte
|
||||||
|
pair kcl.SequencePair
|
||||||
|
}
|
||||||
|
|
||||||
|
type batcherManagerConfig struct {
|
||||||
|
BatchCount int
|
||||||
|
BatchSize int
|
||||||
|
BatchInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
type batcherManager struct {
|
||||||
|
log kv.KayveeLogger
|
||||||
|
sender Sender
|
||||||
|
chkpntManager *checkpointManager
|
||||||
|
|
||||||
|
batchCount int
|
||||||
|
batchSize int
|
||||||
|
batchInterval time.Duration
|
||||||
|
|
||||||
|
batchMsg chan tagMsgPair
|
||||||
|
lastIgnored chan kcl.SequencePair
|
||||||
|
lastProcessed chan kcl.SequencePair
|
||||||
|
shutdown chan chan<- struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBatcherManager(
|
||||||
|
sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger,
|
||||||
|
) *batcherManager {
|
||||||
|
bm := &batcherManager{
|
||||||
|
log: log,
|
||||||
|
sender: sender,
|
||||||
|
chkpntManager: chkpntManager,
|
||||||
|
|
||||||
|
batchCount: cfg.BatchCount,
|
||||||
|
batchSize: cfg.BatchSize,
|
||||||
|
batchInterval: cfg.BatchInterval,
|
||||||
|
|
||||||
|
batchMsg: make(chan tagMsgPair),
|
||||||
|
lastIgnored: make(chan kcl.SequencePair),
|
||||||
|
lastProcessed: make(chan kcl.SequencePair),
|
||||||
|
// shutdown chan takes "done" channel to signal when batchermanager is done shutting down
|
||||||
|
shutdown: make(chan chan<- struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
bm.startMessageHandler(bm.batchMsg, bm.lastIgnored, bm.lastProcessed, bm.shutdown)
|
||||||
|
|
||||||
|
return bm
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcherManager) BatchMessage(tag string, msg []byte, pair kcl.SequencePair) {
|
||||||
|
b.batchMsg <- tagMsgPair{tag, msg, pair}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcherManager) LatestIgnored(pair kcl.SequencePair) {
|
||||||
|
b.lastIgnored <- pair
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcherManager) LatestProcessed(pair kcl.SequencePair) {
|
||||||
|
b.lastProcessed <- pair
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcherManager) Shutdown() <-chan struct{} {
|
||||||
|
done := make(chan struct{})
|
||||||
|
b.shutdown <- done
|
||||||
|
|
||||||
|
return done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcherManager) createBatcher() *batcher {
|
||||||
|
return &batcher{
|
||||||
|
flushCount: b.batchCount,
|
||||||
|
flushSize: b.batchSize,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcherManager) sendBatch(batcher *batcher, tag string) {
|
||||||
|
if len(batcher.Batch) <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err := b.sender.SendBatch(batcher.Batch, tag)
|
||||||
|
switch e := err.(type) {
|
||||||
|
case nil: // Do nothing
|
||||||
|
case PartialSendBatchError:
|
||||||
|
b.log.ErrorD("send-batch", kv.M{"msg": e.Error()})
|
||||||
|
for _, line := range e.FailedMessages {
|
||||||
|
b.log.ErrorD("failed-log", kv.M{"log": line})
|
||||||
|
}
|
||||||
|
stats.Counter("batch-log-failures", len(e.FailedMessages))
|
||||||
|
case CatastrophicSendBatchError:
|
||||||
|
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
|
||||||
|
os.Exit(1)
|
||||||
|
default:
|
||||||
|
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
batcher.Clear()
|
||||||
|
stats.Counter("batches-sent", 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batcherManager) sendCheckpoint(
|
||||||
|
tag string, lastIgnoredPair kcl.SequencePair, batchers map[string]*batcher,
|
||||||
|
) {
|
||||||
|
smallest := lastIgnoredPair
|
||||||
|
|
||||||
|
for name, batcher := range batchers {
|
||||||
|
if tag == name {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batcher.Batch) <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for empty because it's possible that no messages have been ignored
|
||||||
|
if smallest.IsNil() || batcher.SmallestSeq.IsLessThan(smallest) {
|
||||||
|
smallest = batcher.SmallestSeq
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !smallest.IsNil() {
|
||||||
|
b.chkpntManager.Checkpoint(smallest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a
|
||||||
|
// go routine to avoid racey conditions.
|
||||||
|
func (b *batcherManager) startMessageHandler(
|
||||||
|
batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair,
|
||||||
|
shutdown <-chan chan<- struct{},
|
||||||
|
) {
|
||||||
|
flushStaleBatches := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for { // Flush batches that haven't been updated recently
|
||||||
|
<-time.NewTimer(time.Second).C
|
||||||
|
flushStaleBatches <- struct{}{}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
var lastProcessedPair kcl.SequencePair
|
||||||
|
var lastIgnoredPair kcl.SequencePair
|
||||||
|
batchers := map[string]*batcher{}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-flushStaleBatches:
|
||||||
|
for tag, batcher := range batchers {
|
||||||
|
if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) {
|
||||||
|
b.sendBatch(batcher, tag)
|
||||||
|
b.sendCheckpoint(tag, lastIgnoredPair, batchers)
|
||||||
|
batcher.Clear()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case tmp := <-batchMsg:
|
||||||
|
batcher, ok := batchers[tmp.tag]
|
||||||
|
if !ok {
|
||||||
|
batcher = b.createBatcher()
|
||||||
|
batchers[tmp.tag] = batcher
|
||||||
|
stats.Gauge("tag-count", len(batchers))
|
||||||
|
}
|
||||||
|
|
||||||
|
err := batcher.AddMessage(tmp.msg, tmp.pair)
|
||||||
|
if err == ErrBatchFull {
|
||||||
|
b.sendBatch(batcher, tmp.tag)
|
||||||
|
b.sendCheckpoint(tmp.tag, lastIgnoredPair, batchers)
|
||||||
|
|
||||||
|
batcher.AddMessage(tmp.msg, tmp.pair)
|
||||||
|
} else if err != nil {
|
||||||
|
b.log.ErrorD("add-message", kv.M{
|
||||||
|
"err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
stats.Counter("msg-batched", 1)
|
||||||
|
case pair := <-lastIgnored:
|
||||||
|
lastIgnoredPair = pair
|
||||||
|
|
||||||
|
isPendingMessages := false
|
||||||
|
for _, batcher := range batchers {
|
||||||
|
if len(batcher.Batch) > 0 {
|
||||||
|
isPendingMessages = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isPendingMessages {
|
||||||
|
b.chkpntManager.Checkpoint(lastIgnoredPair)
|
||||||
|
}
|
||||||
|
case pair := <-lastProcessed:
|
||||||
|
lastProcessedPair = pair
|
||||||
|
case done := <-shutdown:
|
||||||
|
for tag, batcher := range batchers {
|
||||||
|
b.sendBatch(batcher, tag)
|
||||||
|
}
|
||||||
|
b.chkpntManager.Checkpoint(lastProcessedPair)
|
||||||
|
chkDone := b.chkpntManager.Shutdown()
|
||||||
|
<-chkDone
|
||||||
|
|
||||||
|
done <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
89
batchconsumer/checkpointmanager.go
Normal file
89
batchconsumer/checkpointmanager.go
Normal file
|
|
@ -0,0 +1,89 @@
|
||||||
|
package batchconsumer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
kv "gopkg.in/Clever/kayvee-go.v6/logger"
|
||||||
|
|
||||||
|
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
|
||||||
|
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
||||||
|
)
|
||||||
|
|
||||||
|
type checkpointManager struct {
|
||||||
|
log kv.KayveeLogger
|
||||||
|
|
||||||
|
checkpointFreq time.Duration
|
||||||
|
|
||||||
|
checkpoint chan kcl.SequencePair
|
||||||
|
// shutdown chan takes "done" channel to signal when checkpointManager is done shutting down
|
||||||
|
shutdown chan chan<- struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCheckpointManager(
|
||||||
|
checkpointer kcl.Checkpointer, checkpointFreq time.Duration, log kv.KayveeLogger,
|
||||||
|
) *checkpointManager {
|
||||||
|
cm := &checkpointManager{
|
||||||
|
log: log,
|
||||||
|
|
||||||
|
checkpointFreq: checkpointFreq,
|
||||||
|
|
||||||
|
checkpoint: make(chan kcl.SequencePair),
|
||||||
|
shutdown: make(chan chan<- struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown)
|
||||||
|
|
||||||
|
return cm
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) {
|
||||||
|
cm.checkpoint <- pair
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cm *checkpointManager) Shutdown() <-chan struct{} {
|
||||||
|
done := make(chan struct{})
|
||||||
|
cm.shutdown <- done
|
||||||
|
|
||||||
|
return done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cm *checkpointManager) startCheckpointHandler(
|
||||||
|
checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair,
|
||||||
|
shutdown <-chan chan<- struct{},
|
||||||
|
) {
|
||||||
|
go func() {
|
||||||
|
lastCheckpoint := time.Now()
|
||||||
|
|
||||||
|
for {
|
||||||
|
var doneShutdown chan<- struct{}
|
||||||
|
pair := kcl.SequencePair{}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case pair = <-checkpoint:
|
||||||
|
case doneShutdown = <-shutdown:
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq.
|
||||||
|
// The latest pair number is always used.
|
||||||
|
for doneShutdown == nil && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq {
|
||||||
|
select {
|
||||||
|
case pair = <-checkpoint: // Keep updating checkpoint pair while waiting
|
||||||
|
case doneShutdown = <-shutdown:
|
||||||
|
case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !pair.IsNil() {
|
||||||
|
checkpointer.Checkpoint(pair)
|
||||||
|
lastCheckpoint = time.Now()
|
||||||
|
stats.Counter("checkpoints-sent", 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if doneShutdown != nil {
|
||||||
|
checkpointer.Shutdown()
|
||||||
|
doneShutdown <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
@ -18,6 +18,7 @@ type Config struct {
|
||||||
|
|
||||||
// LogFile where consumer errors and failed log lines are saved
|
// LogFile where consumer errors and failed log lines are saved
|
||||||
LogFile string
|
LogFile string
|
||||||
|
|
||||||
// BatchInterval the upper bound on how often SendBatch is called with accumulated messages
|
// BatchInterval the upper bound on how often SendBatch is called with accumulated messages
|
||||||
BatchInterval time.Duration
|
BatchInterval time.Duration
|
||||||
// BatchCount is the number of messages that triggers a SendBatch call
|
// BatchCount is the number of messages that triggers a SendBatch call
|
||||||
|
|
@ -32,10 +33,6 @@ type Config struct {
|
||||||
|
|
||||||
// CheckpointFreq the frequency in which a checkpoint is saved
|
// CheckpointFreq the frequency in which a checkpoint is saved
|
||||||
CheckpointFreq time.Duration
|
CheckpointFreq time.Duration
|
||||||
// CheckpointRetries the number of times the consumer will try to save a checkpoint
|
|
||||||
CheckpointRetries int
|
|
||||||
// CheckpointRetrySleep the amount of time between checkpoint save attempts
|
|
||||||
CheckpointRetrySleep time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// BatchConsumer is responsible for marshalling
|
// BatchConsumer is responsible for marshalling
|
||||||
|
|
@ -63,8 +60,9 @@ func withDefaults(config Config) Config {
|
||||||
config.DeployEnv = "unknown-env"
|
config.DeployEnv = "unknown-env"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Not totally clear we need this rate limit. The KCL may do rate limiting for us.
|
||||||
if config.ReadRateLimit == 0 {
|
if config.ReadRateLimit == 0 {
|
||||||
config.ReadRateLimit = 300
|
config.ReadRateLimit = 1000
|
||||||
}
|
}
|
||||||
if config.ReadBurstLimit == 0 {
|
if config.ReadBurstLimit == 0 {
|
||||||
config.ReadBurstLimit = int(float64(config.ReadRateLimit)*1.2 + 0.5)
|
config.ReadBurstLimit = int(float64(config.ReadRateLimit)*1.2 + 0.5)
|
||||||
|
|
@ -73,12 +71,6 @@ func withDefaults(config Config) Config {
|
||||||
if config.CheckpointFreq == 0 {
|
if config.CheckpointFreq == 0 {
|
||||||
config.CheckpointFreq = 60 * time.Second
|
config.CheckpointFreq = 60 * time.Second
|
||||||
}
|
}
|
||||||
if config.CheckpointRetries == 0 {
|
|
||||||
config.CheckpointRetries = 5
|
|
||||||
}
|
|
||||||
if config.CheckpointRetrySleep == 0 {
|
|
||||||
config.CheckpointRetrySleep = 5 * time.Second
|
|
||||||
}
|
|
||||||
|
|
||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
|
||||||
50
batchconsumer/stats/stats.go
Normal file
50
batchconsumer/stats/stats.go
Normal file
|
|
@ -0,0 +1,50 @@
|
||||||
|
package stats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/Clever/kayvee-go.v6/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logger.New("amazon-kinesis-client-go")
|
||||||
|
|
||||||
|
type datum struct {
|
||||||
|
name string
|
||||||
|
value int
|
||||||
|
category string
|
||||||
|
}
|
||||||
|
|
||||||
|
var queue = make(chan datum, 1000)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
data := map[string]int{}
|
||||||
|
tick := time.Tick(time.Minute)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case d := <-queue:
|
||||||
|
if d.category == "counter" {
|
||||||
|
data[d.name] = data[d.name] + d.value
|
||||||
|
} else if d.category == "gauge" {
|
||||||
|
data[d.name] = d.value
|
||||||
|
} else {
|
||||||
|
log.ErrorD("unknow-stat-category", logger.M{"category": d.category})
|
||||||
|
}
|
||||||
|
case <-tick:
|
||||||
|
tmp := logger.M{}
|
||||||
|
for k, v := range data {
|
||||||
|
tmp[k] = v
|
||||||
|
}
|
||||||
|
log.InfoD("stats", tmp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func Counter(name string, val int) {
|
||||||
|
queue <- datum{name, val, "counter"}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Gauge(name string, val int) {
|
||||||
|
queue <- datum{name, val, "gauge"}
|
||||||
|
}
|
||||||
|
|
@ -1,10 +0,0 @@
|
||||||
package batchconsumer
|
|
||||||
|
|
||||||
type batcherSync struct {
|
|
||||||
tag string
|
|
||||||
writer *batchedWriter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batcherSync) SendBatch(batch [][]byte) {
|
|
||||||
b.writer.SendBatch(batch, b.tag)
|
|
||||||
}
|
|
||||||
|
|
@ -5,23 +5,15 @@ import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
kv "gopkg.in/Clever/kayvee-go.v6/logger"
|
kv "gopkg.in/Clever/kayvee-go.v6/logger"
|
||||||
|
|
||||||
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/batcher"
|
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
|
||||||
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
||||||
"github.com/Clever/amazon-kinesis-client-go/splitter"
|
"github.com/Clever/amazon-kinesis-client-go/splitter"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tagMsgPair struct {
|
|
||||||
tag string
|
|
||||||
msg []byte
|
|
||||||
pair batcher.SequencePair
|
|
||||||
}
|
|
||||||
|
|
||||||
type batchedWriter struct {
|
type batchedWriter struct {
|
||||||
config Config
|
config Config
|
||||||
sender Sender
|
sender Sender
|
||||||
|
|
@ -29,16 +21,13 @@ type batchedWriter struct {
|
||||||
|
|
||||||
shardID string
|
shardID string
|
||||||
|
|
||||||
checkpointMsg chan batcher.SequencePair
|
chkpntManager *checkpointManager
|
||||||
checkpointTag chan string
|
batcherManager *batcherManager
|
||||||
lastProcessedPair chan batcher.SequencePair
|
|
||||||
batchMsg chan tagMsgPair
|
|
||||||
flushBatches chan struct{}
|
|
||||||
|
|
||||||
// Limits the number of records read from the stream
|
// Limits the number of records read from the stream
|
||||||
rateLimiter *rate.Limiter
|
rateLimiter *rate.Limiter
|
||||||
|
|
||||||
lastProcessedSeq batcher.SequencePair
|
lastProcessedSeq kcl.SequencePair
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter {
|
func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter {
|
||||||
|
|
@ -53,170 +42,19 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche
|
||||||
|
|
||||||
func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
|
func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
|
||||||
b.shardID = shardID
|
b.shardID = shardID
|
||||||
b.checkpointMsg = make(chan batcher.SequencePair)
|
|
||||||
b.startCheckpointListener(checkpointer, b.checkpointMsg)
|
|
||||||
|
|
||||||
b.checkpointTag = make(chan string)
|
bmConfig := batcherManagerConfig{
|
||||||
b.batchMsg = make(chan tagMsgPair)
|
BatchCount: b.config.BatchCount,
|
||||||
b.flushBatches = make(chan struct{})
|
BatchSize: b.config.BatchSize,
|
||||||
b.lastProcessedPair = make(chan batcher.SequencePair)
|
BatchInterval: b.config.BatchInterval,
|
||||||
b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches)
|
}
|
||||||
|
|
||||||
|
b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log)
|
||||||
|
b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleCheckpointError returns true if checkout should be tried again. Returns false otherwise.
|
|
||||||
func (b *batchedWriter) handleCheckpointError(err error) bool {
|
|
||||||
if err == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
cperr, ok := err.(kcl.CheckpointError)
|
|
||||||
if !ok {
|
|
||||||
b.log.ErrorD("unknown-checkpoint-error", kv.M{"msg": err.Error(), "shard-id": b.shardID})
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
switch cperr.Error() {
|
|
||||||
case "ShutdownException": // Skips checkpointing
|
|
||||||
b.log.ErrorD("shutdown-checkpoint-exception", kv.M{
|
|
||||||
"msg": err.Error(), "shard-id": b.shardID,
|
|
||||||
})
|
|
||||||
return false
|
|
||||||
case "ThrottlingException":
|
|
||||||
b.log.ErrorD("checkpoint-throttle", kv.M{"shard-id": b.shardID})
|
|
||||||
case "InvalidStateException":
|
|
||||||
b.log.ErrorD("invalid-checkpoint-state", kv.M{"shard-id": b.shardID})
|
|
||||||
default:
|
|
||||||
b.log.ErrorD("checkpoint-error", kv.M{"shard-id": b.shardID, "msg": err})
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batchedWriter) startCheckpointListener(
|
|
||||||
checkpointer kcl.Checkpointer, checkpointMsg <-chan batcher.SequencePair,
|
|
||||||
) {
|
|
||||||
go func() {
|
|
||||||
lastCheckpoint := time.Now()
|
|
||||||
|
|
||||||
for {
|
|
||||||
seq := <-checkpointMsg
|
|
||||||
|
|
||||||
// This is a write throttle to ensure we don't checkpoint faster than
|
|
||||||
// b.config.CheckpointFreq. The latest seq number is always used.
|
|
||||||
for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq {
|
|
||||||
select {
|
|
||||||
case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting
|
|
||||||
case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
retry := true
|
|
||||||
for n := 0; retry && n < b.config.CheckpointRetries+1; n++ {
|
|
||||||
str := seq.Sequence.String()
|
|
||||||
err := checkpointer.Checkpoint(&str, &seq.SubSequence)
|
|
||||||
if err == nil { // Successfully checkpointed!
|
|
||||||
lastCheckpoint = time.Now()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
retry = b.handleCheckpointError(err)
|
|
||||||
|
|
||||||
if n == b.config.CheckpointRetries {
|
|
||||||
b.log.ErrorD("checkpoint-retries", kv.M{"attempts": b.config.CheckpointRetries})
|
|
||||||
retry = false
|
|
||||||
}
|
|
||||||
|
|
||||||
if retry {
|
|
||||||
time.Sleep(b.config.CheckpointRetrySleep)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batchedWriter) createBatcher(tag string) batcher.Batcher {
|
|
||||||
sync := &batcherSync{
|
|
||||||
tag: tag,
|
|
||||||
writer: b,
|
|
||||||
}
|
|
||||||
batch, err := batcher.New(sync, b.config.BatchInterval, b.config.BatchCount, b.config.BatchSize)
|
|
||||||
if err != nil {
|
|
||||||
b.log.ErrorD("create-batcher", kv.M{"msg": err.Error(), "tag": tag})
|
|
||||||
}
|
|
||||||
|
|
||||||
return batch
|
|
||||||
}
|
|
||||||
|
|
||||||
// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a
|
|
||||||
// go routine to avoid racey conditions.
|
|
||||||
func (b *batchedWriter) startMessageHandler(
|
|
||||||
batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan batcher.SequencePair,
|
|
||||||
flushBatches <-chan struct{},
|
|
||||||
) {
|
|
||||||
go func() {
|
|
||||||
var lastProcessedPair batcher.SequencePair
|
|
||||||
batchers := map[string]batcher.Batcher{}
|
|
||||||
areBatchersEmpty := true
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case tmp := <-batchMsg:
|
|
||||||
batcher, ok := batchers[tmp.tag]
|
|
||||||
if !ok {
|
|
||||||
batcher = b.createBatcher(tmp.tag)
|
|
||||||
batchers[tmp.tag] = batcher
|
|
||||||
}
|
|
||||||
|
|
||||||
err := batcher.AddMessage(tmp.msg, tmp.pair)
|
|
||||||
if err != nil {
|
|
||||||
b.log.ErrorD("add-message", kv.M{
|
|
||||||
"err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
areBatchersEmpty = false
|
|
||||||
case tag := <-checkpointTag:
|
|
||||||
smallest := lastProcessedPair
|
|
||||||
isAllEmpty := true
|
|
||||||
|
|
||||||
for name, batch := range batchers {
|
|
||||||
if tag == name {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
pair := batch.SmallestSequencePair()
|
|
||||||
if pair.IsEmpty() { // Occurs when batch has no items
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if pair.IsLessThan(smallest) {
|
|
||||||
smallest = pair
|
|
||||||
}
|
|
||||||
|
|
||||||
isAllEmpty = false
|
|
||||||
}
|
|
||||||
|
|
||||||
if !smallest.IsEmpty() {
|
|
||||||
b.checkpointMsg <- smallest
|
|
||||||
}
|
|
||||||
areBatchersEmpty = isAllEmpty
|
|
||||||
case pair := <-lastPair:
|
|
||||||
if areBatchersEmpty {
|
|
||||||
b.checkpointMsg <- pair
|
|
||||||
}
|
|
||||||
lastProcessedPair = pair
|
|
||||||
case <-flushBatches:
|
|
||||||
for _, batch := range batchers {
|
|
||||||
batch.Flush()
|
|
||||||
}
|
|
||||||
b.checkpointMsg <- lastProcessedPair
|
|
||||||
areBatchersEmpty = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
|
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
|
||||||
// We handle two types of records:
|
// We handle two types of records:
|
||||||
// - records emitted from CWLogs Subscription
|
// - records emitted from CWLogs Subscription
|
||||||
|
|
@ -231,7 +69,7 @@ func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
var pair batcher.SequencePair
|
var pair kcl.SequencePair
|
||||||
prevPair := b.lastProcessedSeq
|
prevPair := b.lastProcessedSeq
|
||||||
|
|
||||||
for _, record := range records {
|
for _, record := range records {
|
||||||
|
|
@ -243,8 +81,8 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
|
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
pair = batcher.SequencePair{seq, record.SubSequenceNumber}
|
pair = kcl.SequencePair{seq, record.SubSequenceNumber}
|
||||||
if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty
|
if prevPair.IsNil() { // Handles on-start edge case where b.lastProcessSeq is empty
|
||||||
prevPair = pair
|
prevPair = pair
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -257,23 +95,27 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
wasPairIgnored := true
|
||||||
for _, rawmsg := range messages {
|
for _, rawmsg := range messages {
|
||||||
msg, tags, err := b.sender.ProcessMessage(rawmsg)
|
msg, tags, err := b.sender.ProcessMessage(rawmsg)
|
||||||
|
|
||||||
if err == ErrMessageIgnored {
|
if err == ErrMessageIgnored {
|
||||||
continue // Skip message
|
continue // Skip message
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
stats.Counter("unknown-error", 1)
|
||||||
b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)})
|
b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)})
|
||||||
continue // Don't stop processing messages because of one bad message
|
continue // Don't stop processing messages because of one bad message
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tags) == 0 {
|
if len(tags) == 0 {
|
||||||
|
stats.Counter("no-tags", 1)
|
||||||
b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)})
|
b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)})
|
||||||
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg))
|
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tag := range tags {
|
for _, tag := range tags {
|
||||||
if tag == "" {
|
if tag == "" {
|
||||||
|
stats.Counter("blank-tag", 1)
|
||||||
b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)})
|
b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)})
|
||||||
return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg))
|
return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg))
|
||||||
}
|
}
|
||||||
|
|
@ -282,44 +124,33 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
// it's been sent. When batches are sent, conceptually we first find the smallest
|
// it's been sent. When batches are sent, conceptually we first find the smallest
|
||||||
// sequence number amount all the batch (let's call it A). We then checkpoint at
|
// sequence number amount all the batch (let's call it A). We then checkpoint at
|
||||||
// the A-1 sequence number.
|
// the A-1 sequence number.
|
||||||
b.batchMsg <- tagMsgPair{tag, msg, prevPair}
|
b.batcherManager.BatchMessage(tag, msg, prevPair)
|
||||||
|
wasPairIgnored = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
prevPair = pair
|
prevPair = pair
|
||||||
b.lastProcessedPair <- pair
|
if wasPairIgnored {
|
||||||
|
b.batcherManager.LatestIgnored(pair)
|
||||||
|
}
|
||||||
|
b.batcherManager.LatestProcessed(pair)
|
||||||
|
|
||||||
|
stats.Counter("processed-messages", len(messages))
|
||||||
}
|
}
|
||||||
b.lastProcessedSeq = pair
|
b.lastProcessedSeq = pair
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batchedWriter) SendBatch(batch [][]byte, tag string) {
|
|
||||||
err := b.sender.SendBatch(batch, tag)
|
|
||||||
switch e := err.(type) {
|
|
||||||
case nil: // Do nothing
|
|
||||||
case PartialSendBatchError:
|
|
||||||
b.log.ErrorD("send-batch", kv.M{"msg": e.Error()})
|
|
||||||
for _, line := range e.FailedMessages {
|
|
||||||
b.log.ErrorD("failed-log", kv.M{"log": line})
|
|
||||||
}
|
|
||||||
case CatastrophicSendBatchError:
|
|
||||||
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
|
|
||||||
os.Exit(1)
|
|
||||||
default:
|
|
||||||
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.checkpointTag <- tag
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batchedWriter) Shutdown(reason string) error {
|
func (b *batchedWriter) Shutdown(reason string) error {
|
||||||
if reason == "TERMINATE" {
|
if reason == "TERMINATE" {
|
||||||
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
|
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
|
||||||
b.flushBatches <- struct{}{}
|
|
||||||
} else {
|
} else {
|
||||||
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
|
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
done := b.batcherManager.Shutdown()
|
||||||
|
<-done
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -86,30 +86,28 @@ type mockCheckpointer struct {
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMockCheckpointer(maxSeq string, timeout time.Duration) *mockCheckpointer {
|
func NewMockCheckpointer(timeout time.Duration) *mockCheckpointer {
|
||||||
mcp := &mockCheckpointer{
|
mcp := &mockCheckpointer{
|
||||||
checkpoint: make(chan string),
|
checkpoint: make(chan string),
|
||||||
done: make(chan struct{}, 1),
|
done: make(chan struct{}, 1),
|
||||||
timeout: make(chan struct{}, 1),
|
timeout: make(chan struct{}, 1),
|
||||||
shutdown: make(chan struct{}),
|
shutdown: make(chan struct{}),
|
||||||
}
|
}
|
||||||
mcp.startWaiter(maxSeq, timeout)
|
mcp.startWaiter(timeout)
|
||||||
|
|
||||||
return mcp
|
return mcp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockCheckpointer) startWaiter(maxSeq string, timeout time.Duration) {
|
func (m *mockCheckpointer) startWaiter(timeout time.Duration) {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case seq := <-m.checkpoint:
|
case seq := <-m.checkpoint:
|
||||||
m.recievedSequences = append(m.recievedSequences, seq)
|
m.recievedSequences = append(m.recievedSequences, seq)
|
||||||
if seq == maxSeq {
|
|
||||||
m.done <- struct{}{}
|
|
||||||
}
|
|
||||||
case <-time.NewTimer(timeout).C:
|
case <-time.NewTimer(timeout).C:
|
||||||
m.timeout <- struct{}{}
|
m.timeout <- struct{}{}
|
||||||
case <-m.shutdown:
|
case <-m.shutdown:
|
||||||
|
m.done <- struct{}{}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -126,14 +124,8 @@ func (m *mockCheckpointer) wait() error {
|
||||||
func (m *mockCheckpointer) Shutdown() {
|
func (m *mockCheckpointer) Shutdown() {
|
||||||
m.shutdown <- struct{}{}
|
m.shutdown <- struct{}{}
|
||||||
}
|
}
|
||||||
func (m *mockCheckpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error {
|
func (m *mockCheckpointer) Checkpoint(pair kcl.SequencePair) {
|
||||||
m.checkpoint <- *sequenceNumber
|
m.checkpoint <- pair.Sequence.String()
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (m *mockCheckpointer) CheckpointWithRetry(
|
|
||||||
sequenceNumber *string, subSequenceNumber *int, retryCount int,
|
|
||||||
) error {
|
|
||||||
return m.Checkpoint(sequenceNumber, subSequenceNumber)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func encode(str string) string {
|
func encode(str string) string {
|
||||||
|
|
@ -148,7 +140,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) {
|
||||||
BatchInterval: 10 * time.Millisecond,
|
BatchInterval: 10 * time.Millisecond,
|
||||||
CheckpointFreq: 20 * time.Millisecond,
|
CheckpointFreq: 20 * time.Millisecond,
|
||||||
})
|
})
|
||||||
mockcheckpointer := NewMockCheckpointer("4", 5*time.Second)
|
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
||||||
|
|
||||||
wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog)
|
wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog)
|
||||||
wrt.Initialize("test-shard", mockcheckpointer)
|
wrt.Initialize("test-shard", mockcheckpointer)
|
||||||
|
|
@ -161,8 +153,13 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) {
|
||||||
})
|
})
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
|
err = wrt.Shutdown("TERMINATE")
|
||||||
|
assert.NoError(err)
|
||||||
|
|
||||||
err = mockcheckpointer.wait()
|
err = mockcheckpointer.wait()
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
|
assert.Contains(mockcheckpointer.recievedSequences, "4")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessRecordsMutliBatchBasic(t *testing.T) {
|
func TestProcessRecordsMutliBatchBasic(t *testing.T) {
|
||||||
|
|
@ -173,7 +170,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) {
|
||||||
BatchInterval: 100 * time.Millisecond,
|
BatchInterval: 100 * time.Millisecond,
|
||||||
CheckpointFreq: 200 * time.Millisecond,
|
CheckpointFreq: 200 * time.Millisecond,
|
||||||
})
|
})
|
||||||
mockcheckpointer := NewMockCheckpointer("8", 5*time.Second)
|
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
||||||
mocksender := NewMsgAsTagSender()
|
mocksender := NewMsgAsTagSender()
|
||||||
|
|
||||||
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
||||||
|
|
@ -194,8 +191,6 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) {
|
||||||
})
|
})
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once
|
|
||||||
|
|
||||||
err = wrt.Shutdown("TERMINATE")
|
err = wrt.Shutdown("TERMINATE")
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
|
|
@ -233,7 +228,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
|
||||||
BatchInterval: 100 * time.Millisecond,
|
BatchInterval: 100 * time.Millisecond,
|
||||||
CheckpointFreq: 200 * time.Millisecond,
|
CheckpointFreq: 200 * time.Millisecond,
|
||||||
})
|
})
|
||||||
mockcheckpointer := NewMockCheckpointer("26", 5*time.Second)
|
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
||||||
mocksender := NewMsgAsTagSender()
|
mocksender := NewMsgAsTagSender()
|
||||||
|
|
||||||
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
||||||
|
|
@ -272,8 +267,6 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
|
||||||
})
|
})
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once
|
|
||||||
|
|
||||||
err = wrt.Shutdown("TERMINATE")
|
err = wrt.Shutdown("TERMINATE")
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
|
|
@ -312,7 +305,7 @@ func TestStaggeredCheckpionting(t *testing.T) {
|
||||||
BatchInterval: 100 * time.Millisecond,
|
BatchInterval: 100 * time.Millisecond,
|
||||||
CheckpointFreq: 200 * time.Nanosecond,
|
CheckpointFreq: 200 * time.Nanosecond,
|
||||||
})
|
})
|
||||||
mockcheckpointer := NewMockCheckpointer("9", 5*time.Second)
|
mockcheckpointer := NewMockCheckpointer(5 * time.Second)
|
||||||
mocksender := NewMsgAsTagSender()
|
mocksender := NewMsgAsTagSender()
|
||||||
|
|
||||||
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
|
||||||
|
|
@ -343,7 +336,6 @@ func TestStaggeredCheckpionting(t *testing.T) {
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
mocksender.Shutdown()
|
mocksender.Shutdown()
|
||||||
mockcheckpointer.Shutdown()
|
|
||||||
|
|
||||||
// Test to make sure writer doesn't prematurely checkpoint messages
|
// Test to make sure writer doesn't prematurely checkpoint messages
|
||||||
// Checkpoints 5,6,7,8 will never be submitted because the 3rd "tag1" is in a batch
|
// Checkpoints 5,6,7,8 will never be submitted because the 3rd "tag1" is in a batch
|
||||||
|
|
@ -352,6 +344,7 @@ func TestStaggeredCheckpionting(t *testing.T) {
|
||||||
assert.NotContains(mockcheckpointer.recievedSequences, "6")
|
assert.NotContains(mockcheckpointer.recievedSequences, "6")
|
||||||
assert.NotContains(mockcheckpointer.recievedSequences, "7")
|
assert.NotContains(mockcheckpointer.recievedSequences, "7")
|
||||||
assert.NotContains(mockcheckpointer.recievedSequences, "8")
|
assert.NotContains(mockcheckpointer.recievedSequences, "8")
|
||||||
|
assert.Contains(mockcheckpointer.recievedSequences, "9")
|
||||||
|
|
||||||
assert.Contains(mocksender.batches, "tag1")
|
assert.Contains(mocksender.batches, "tag1")
|
||||||
assert.Equal(2, len(mocksender.batches["tag1"])) // One batch
|
assert.Equal(2, len(mocksender.batches["tag1"])) // One batch
|
||||||
|
|
@ -365,8 +358,10 @@ func TestStaggeredCheckpionting(t *testing.T) {
|
||||||
assert.Equal(2, len(mocksender.batches["tag3"][0])) // with three items
|
assert.Equal(2, len(mocksender.batches["tag3"][0])) // with three items
|
||||||
assert.Equal("tag3", string(mocksender.batches["tag3"][0][0]))
|
assert.Equal("tag3", string(mocksender.batches["tag3"][0][0]))
|
||||||
assert.Equal("tag3", string(mocksender.batches["tag3"][0][1]))
|
assert.Equal("tag3", string(mocksender.batches["tag3"][0][1]))
|
||||||
|
assert.Equal(2, len(mocksender.batches["tag3"][1]))
|
||||||
assert.Equal("tag3", string(mocksender.batches["tag3"][1][0]))
|
assert.Equal("tag3", string(mocksender.batches["tag3"][1][0]))
|
||||||
assert.Equal("tag3", string(mocksender.batches["tag3"][1][1]))
|
assert.Equal("tag3", string(mocksender.batches["tag3"][1][1]))
|
||||||
|
assert.Equal(2, len(mocksender.batches["tag3"][2]))
|
||||||
assert.Equal("tag3", string(mocksender.batches["tag3"][2][0]))
|
assert.Equal("tag3", string(mocksender.batches["tag3"][2][0]))
|
||||||
assert.Equal("tag3", string(mocksender.batches["tag3"][2][1]))
|
assert.Equal("tag3", string(mocksender.batches["tag3"][2][1]))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ compile:
|
||||||
override:
|
override:
|
||||||
- make install_deps
|
- make install_deps
|
||||||
- make build
|
- make build
|
||||||
|
- make bench
|
||||||
test:
|
test:
|
||||||
override:
|
override:
|
||||||
- make test
|
- make test
|
||||||
|
|
|
||||||
|
|
@ -10,18 +10,15 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type sampleRecordProcessor struct {
|
type sampleRecordProcessor struct {
|
||||||
checkpointer kcl.Checkpointer
|
checkpointer kcl.Checkpointer
|
||||||
checkpointRetries int
|
checkpointFreq time.Duration
|
||||||
checkpointFreq time.Duration
|
largestPair kcl.SequencePair
|
||||||
largestSeq *big.Int
|
lastCheckpoint time.Time
|
||||||
largestSubSeq int
|
|
||||||
lastCheckpoint time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSampleRecordProcessor() *sampleRecordProcessor {
|
func newSampleRecordProcessor() *sampleRecordProcessor {
|
||||||
return &sampleRecordProcessor{
|
return &sampleRecordProcessor{
|
||||||
checkpointRetries: 5,
|
checkpointFreq: 60 * time.Second,
|
||||||
checkpointFreq: 60 * time.Second,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -31,9 +28,8 @@ func (srp *sampleRecordProcessor) Initialize(shardID string, checkpointer kcl.Ch
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srp *sampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool {
|
func (srp *sampleRecordProcessor) shouldUpdateSequence(pair kcl.SequencePair) bool {
|
||||||
return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 ||
|
return srp.largestPair.IsLessThan(pair)
|
||||||
(sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
|
func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
|
||||||
|
|
@ -43,14 +39,13 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
|
||||||
fmt.Fprintf(os.Stderr, "could not parse sequence number '%s'\n", record.SequenceNumber)
|
fmt.Fprintf(os.Stderr, "could not parse sequence number '%s'\n", record.SequenceNumber)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if srp.shouldUpdateSequence(seqNumber, record.SubSequenceNumber) {
|
pair := kcl.SequencePair{seqNumber, record.SubSequenceNumber}
|
||||||
srp.largestSeq = seqNumber
|
if srp.shouldUpdateSequence(pair) {
|
||||||
srp.largestSubSeq = record.SubSequenceNumber
|
srp.largestPair = pair
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq {
|
if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq {
|
||||||
largestSeq := srp.largestSeq.String()
|
srp.checkpointer.Checkpoint(srp.largestPair)
|
||||||
srp.checkpointer.CheckpointWithRetry(&largestSeq, &srp.largestSubSeq, srp.checkpointRetries)
|
|
||||||
srp.lastCheckpoint = time.Now()
|
srp.lastCheckpoint = time.Now()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -83,8 +83,14 @@ func FieldsFromKayvee(line string) (map[string]interface{}, error) {
|
||||||
possibleJSON := line[firstIdx : lastIdx+1]
|
possibleJSON := line[firstIdx : lastIdx+1]
|
||||||
var fields map[string]interface{}
|
var fields map[string]interface{}
|
||||||
if err := json.Unmarshal([]byte(possibleJSON), &fields); err != nil {
|
if err := json.Unmarshal([]byte(possibleJSON), &fields); err != nil {
|
||||||
return map[string]interface{}{}, err
|
return map[string]interface{}{}, &NonKayveeError{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(fields) == 0 { // Some logs superfluous "{}" in them. They're not kayvee.
|
||||||
|
return map[string]interface{}{}, &NonKayveeError{}
|
||||||
|
}
|
||||||
|
// TODO: consider also filter if they have source and title
|
||||||
|
|
||||||
for k, v := range fields {
|
for k, v := range fields {
|
||||||
if !stringInSlice(k, reservedFields) {
|
if !stringInSlice(k, reservedFields) {
|
||||||
m[k] = v
|
m[k] = v
|
||||||
|
|
@ -319,9 +325,14 @@ func ExtractKVMeta(kvlog map[string]interface{}) KVMeta {
|
||||||
|
|
||||||
tmp, ok = kvmeta["routes"]
|
tmp, ok = kvmeta["routes"]
|
||||||
if ok {
|
if ok {
|
||||||
routes, ok := tmp.([]map[string]interface{})
|
routes, ok := tmp.([]interface{})
|
||||||
if ok {
|
if ok {
|
||||||
kvRoutes = routes
|
for _, route := range routes {
|
||||||
|
rule, ok := route.(map[string]interface{})
|
||||||
|
if ok { // TODO: log error
|
||||||
|
kvRoutes = append(kvRoutes, rule)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package decode
|
package decode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -75,7 +74,13 @@ func TestKayveeDecoding(t *testing.T) {
|
||||||
Title: "errors on invalid JSON (missing a quote)",
|
Title: "errors on invalid JSON (missing a quote)",
|
||||||
Input: `prefix {"a:"b"} postfix`,
|
Input: `prefix {"a:"b"} postfix`,
|
||||||
ExpectedOutput: map[string]interface{}{},
|
ExpectedOutput: map[string]interface{}{},
|
||||||
ExpectedError: &json.SyntaxError{},
|
ExpectedError: &NonKayveeError{},
|
||||||
|
},
|
||||||
|
Spec{
|
||||||
|
Title: "errors on empty JSON: {}",
|
||||||
|
Input: `prefix {} postfix`,
|
||||||
|
ExpectedOutput: map[string]interface{}{},
|
||||||
|
ExpectedError: &NonKayveeError{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -551,7 +556,7 @@ func TestExtractKVMeta(t *testing.T) {
|
||||||
"team": "green",
|
"team": "green",
|
||||||
"kv_version": "three",
|
"kv_version": "three",
|
||||||
"kv_language": "tree",
|
"kv_language": "tree",
|
||||||
"routes": []map[string]interface{}{
|
"routes": []interface{}{
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"type": "metrics",
|
"type": "metrics",
|
||||||
"rule": "cool",
|
"rule": "cool",
|
||||||
|
|
@ -590,7 +595,7 @@ func TestExtractKVMeta(t *testing.T) {
|
||||||
"team": "green",
|
"team": "green",
|
||||||
"kv_version": "christmas",
|
"kv_version": "christmas",
|
||||||
"kv_language": "tree",
|
"kv_language": "tree",
|
||||||
"routes": []map[string]interface{}{
|
"routes": []interface{}{
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"type": "analytics",
|
"type": "analytics",
|
||||||
"rule": "what's-this?",
|
"rule": "what's-this?",
|
||||||
|
|
@ -632,7 +637,7 @@ func TestExtractKVMeta(t *testing.T) {
|
||||||
"team": "slack",
|
"team": "slack",
|
||||||
"kv_version": "evergreen",
|
"kv_version": "evergreen",
|
||||||
"kv_language": "markdown-ish",
|
"kv_language": "markdown-ish",
|
||||||
"routes": []map[string]interface{}{
|
"routes": []interface{}{
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"type": "notifications",
|
"type": "notifications",
|
||||||
"rule": "did-you-know",
|
"rule": "did-you-know",
|
||||||
|
|
@ -678,7 +683,7 @@ func TestExtractKVMeta(t *testing.T) {
|
||||||
"team": "a-team",
|
"team": "a-team",
|
||||||
"kv_version": "old",
|
"kv_version": "old",
|
||||||
"kv_language": "jive",
|
"kv_language": "jive",
|
||||||
"routes": []map[string]interface{}{
|
"routes": []interface{}{
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"type": "alerts",
|
"type": "alerts",
|
||||||
"rule": "last-call",
|
"rule": "last-call",
|
||||||
|
|
@ -740,7 +745,7 @@ func TestExtractKVMeta(t *testing.T) {
|
||||||
"team": "diversity",
|
"team": "diversity",
|
||||||
"kv_version": "kv-routes",
|
"kv_version": "kv-routes",
|
||||||
"kv_language": "understanding",
|
"kv_language": "understanding",
|
||||||
"routes": []map[string]interface{}{
|
"routes": []interface{}{
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"type": "metrics",
|
"type": "metrics",
|
||||||
"rule": "all-combos",
|
"rule": "all-combos",
|
||||||
|
|
|
||||||
261
kcl/kcl.go
261
kcl/kcl.go
|
|
@ -2,7 +2,6 @@ package kcl
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
@ -14,118 +13,21 @@ import (
|
||||||
type RecordProcessor interface {
|
type RecordProcessor interface {
|
||||||
Initialize(shardID string, checkpointer Checkpointer) error
|
Initialize(shardID string, checkpointer Checkpointer) error
|
||||||
ProcessRecords(records []Record) error
|
ProcessRecords(records []Record) error
|
||||||
|
// Shutdown this call should block until it's safe to shutdown the process
|
||||||
Shutdown(reason string) error
|
Shutdown(reason string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Checkpointer interface {
|
type Checkpointer interface {
|
||||||
Checkpoint(sequenceNumber *string, subSequenceNumber *int) error
|
Checkpoint(pair SequencePair)
|
||||||
CheckpointWithRetry(sequenceNumber *string, subSequenceNumber *int, retryCount int) error
|
|
||||||
Shutdown()
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
type CheckpointError struct {
|
|
||||||
e string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ce CheckpointError) Error() string {
|
|
||||||
return ce.e
|
|
||||||
}
|
|
||||||
|
|
||||||
type checkpointer struct {
|
|
||||||
mux sync.Mutex
|
|
||||||
|
|
||||||
ioHandler ioHandler
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *checkpointer) getAction() (interface{}, error) {
|
|
||||||
line, err := c.ioHandler.readLine()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
action, err := c.ioHandler.loadAction(line.String())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return action, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error {
|
|
||||||
c.mux.Lock()
|
|
||||||
defer c.mux.Unlock()
|
|
||||||
|
|
||||||
c.ioHandler.writeAction(ActionCheckpoint{
|
|
||||||
Action: "checkpoint",
|
|
||||||
SequenceNumber: sequenceNumber,
|
|
||||||
SubSequenceNumber: subSequenceNumber,
|
|
||||||
})
|
|
||||||
line, err := c.ioHandler.readLine()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
actionI, err := c.ioHandler.loadAction(line.String())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
action, ok := actionI.(ActionCheckpoint)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("expected checkpoint response, got '%s'", line.String())
|
|
||||||
}
|
|
||||||
if action.Error != nil && *action.Error != "" {
|
|
||||||
return CheckpointError{
|
|
||||||
e: *action.Error,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckpointWithRetry tries to save a checkPoint up to `retryCount` + 1 times.
|
|
||||||
// `retryCount` should be >= 0
|
|
||||||
func (c *checkpointer) CheckpointWithRetry(
|
|
||||||
sequenceNumber *string, subSequenceNumber *int, retryCount int,
|
|
||||||
) error {
|
|
||||||
sleepDuration := 5 * time.Second
|
|
||||||
|
|
||||||
for n := 0; n <= retryCount; n++ {
|
|
||||||
err := c.Checkpoint(sequenceNumber, subSequenceNumber)
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if cperr, ok := err.(CheckpointError); ok {
|
|
||||||
switch cperr.Error() {
|
|
||||||
case "ShutdownException":
|
|
||||||
return fmt.Errorf("Encountered shutdown exception, skipping checkpoint")
|
|
||||||
case "ThrottlingException":
|
|
||||||
fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s\n", sleepDuration)
|
|
||||||
case "InvalidStateException":
|
|
||||||
fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n")
|
|
||||||
default:
|
|
||||||
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if n == retryCount {
|
|
||||||
return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(sleepDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *checkpointer) Shutdown() {
|
|
||||||
c.CheckpointWithRetry(nil, nil, 5)
|
|
||||||
}
|
|
||||||
|
|
||||||
type ioHandler struct {
|
type ioHandler struct {
|
||||||
inputFile io.Reader
|
inputFile io.Reader
|
||||||
outputFile io.Writer
|
outputFile io.Writer
|
||||||
errorFile io.Writer
|
errorFile io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
//func newIOHandler(inputFile io.Reader, outputFile io.Writer, errorFile io.)
|
|
||||||
|
|
||||||
func (i ioHandler) writeLine(line string) {
|
func (i ioHandler) writeLine(line string) {
|
||||||
fmt.Fprintf(i.outputFile, "\n%s\n", line)
|
fmt.Fprintf(i.outputFile, "\n%s\n", line)
|
||||||
}
|
}
|
||||||
|
|
@ -134,13 +36,13 @@ func (i ioHandler) writeError(message string) {
|
||||||
fmt.Fprintf(i.errorFile, "%s\n", message)
|
fmt.Fprintf(i.errorFile, "%s\n", message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ioHandler) readLine() (*bytes.Buffer, error) {
|
func (i ioHandler) readLine() (string, error) {
|
||||||
bio := bufio.NewReader(i.inputFile)
|
bio := bufio.NewReader(i.inputFile)
|
||||||
line, err := bio.ReadString('\n')
|
line, err := bio.ReadString('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", err
|
||||||
}
|
}
|
||||||
return bytes.NewBufferString(line), nil
|
return line, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type ActionInitialize struct {
|
type ActionInitialize struct {
|
||||||
|
|
@ -197,6 +99,8 @@ func (i ioHandler) loadAction(line string) (interface{}, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return actionProcessRecords, nil
|
return actionProcessRecords, nil
|
||||||
|
case "shutdownRequested":
|
||||||
|
fallthrough
|
||||||
case "shutdown":
|
case "shutdown":
|
||||||
var actionShutdown ActionShutdown
|
var actionShutdown ActionShutdown
|
||||||
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
|
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
|
||||||
|
|
@ -223,25 +127,88 @@ func (i ioHandler) writeAction(action interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor) *KCLProcess {
|
func New(
|
||||||
|
inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor,
|
||||||
|
) *KCLProcess {
|
||||||
i := ioHandler{
|
i := ioHandler{
|
||||||
inputFile: inputFile,
|
inputFile: inputFile,
|
||||||
outputFile: outputFile,
|
outputFile: outputFile,
|
||||||
errorFile: errorFile,
|
errorFile: errorFile,
|
||||||
}
|
}
|
||||||
return &KCLProcess{
|
return &KCLProcess{
|
||||||
ioHandler: i,
|
ioHandler: i,
|
||||||
checkpointer: &checkpointer{
|
|
||||||
ioHandler: i,
|
|
||||||
},
|
|
||||||
recordProcessor: recordProcessor,
|
recordProcessor: recordProcessor,
|
||||||
|
|
||||||
|
nextCheckpointPair: SequencePair{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type KCLProcess struct {
|
type KCLProcess struct {
|
||||||
|
ckpmux sync.Mutex
|
||||||
|
|
||||||
ioHandler ioHandler
|
ioHandler ioHandler
|
||||||
checkpointer Checkpointer
|
|
||||||
recordProcessor RecordProcessor
|
recordProcessor RecordProcessor
|
||||||
|
|
||||||
|
nextCheckpointPair SequencePair
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kclp *KCLProcess) Checkpoint(pair SequencePair) {
|
||||||
|
kclp.ckpmux.Lock()
|
||||||
|
defer kclp.ckpmux.Unlock()
|
||||||
|
|
||||||
|
if kclp.nextCheckpointPair.IsNil() || kclp.nextCheckpointPair.IsLessThan(pair) {
|
||||||
|
kclp.nextCheckpointPair = pair
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kclp *KCLProcess) Shutdown() {
|
||||||
|
kclp.ioHandler.writeError("Checkpoint shutdown")
|
||||||
|
kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
|
||||||
|
if action.Error == nil { // Successful checkpoint
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := *action.Error
|
||||||
|
switch msg {
|
||||||
|
case "ShutdownException":
|
||||||
|
return fmt.Errorf("Encountered shutdown exception, skipping checkpoint")
|
||||||
|
case "ThrottlingException":
|
||||||
|
sleep := 5 * time.Second
|
||||||
|
fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s", sleep)
|
||||||
|
time.Sleep(sleep)
|
||||||
|
case "InvalidStateException":
|
||||||
|
fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing")
|
||||||
|
default:
|
||||||
|
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
seq := action.SequenceNumber
|
||||||
|
subSeq := action.SubSequenceNumber
|
||||||
|
|
||||||
|
kclp.ckpmux.Lock()
|
||||||
|
if !kclp.nextCheckpointPair.IsNil() {
|
||||||
|
tmp := kclp.nextCheckpointPair.Sequence.String()
|
||||||
|
seq = &tmp
|
||||||
|
subSeq = &kclp.nextCheckpointPair.SubSequence
|
||||||
|
}
|
||||||
|
kclp.ckpmux.Unlock()
|
||||||
|
|
||||||
|
if seq != nil && subSeq != nil {
|
||||||
|
return kclp.sendCheckpoint(seq, subSeq)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error {
|
||||||
|
return kclp.ioHandler.writeAction(ActionCheckpoint{
|
||||||
|
Action: "checkpoint",
|
||||||
|
SequenceNumber: seq,
|
||||||
|
SubSequenceNumber: subSeq,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kclp *KCLProcess) reportDone(responseFor string) error {
|
func (kclp *KCLProcess) reportDone(responseFor string) error {
|
||||||
|
|
@ -254,47 +221,75 @@ func (kclp *KCLProcess) reportDone(responseFor string) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kclp *KCLProcess) performAction(a interface{}) (string, error) {
|
|
||||||
switch action := a.(type) {
|
|
||||||
case ActionInitialize:
|
|
||||||
return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp.checkpointer)
|
|
||||||
case ActionProcessRecords:
|
|
||||||
return action.Action, kclp.recordProcessor.ProcessRecords(action.Records)
|
|
||||||
case ActionShutdown:
|
|
||||||
return action.Action, kclp.recordProcessor.Shutdown(action.Reason)
|
|
||||||
default:
|
|
||||||
return "", fmt.Errorf("unknown action to dispatch: %s", action)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kclp *KCLProcess) handleLine(line string) error {
|
func (kclp *KCLProcess) handleLine(line string) error {
|
||||||
action, err := kclp.ioHandler.loadAction(line)
|
action, err := kclp.ioHandler.loadAction(line)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
responseFor, err := kclp.performAction(action)
|
switch action := action.(type) {
|
||||||
if err != nil {
|
case ActionCheckpoint:
|
||||||
return err
|
err = kclp.handleCheckpointAction(action)
|
||||||
|
case ActionShutdown:
|
||||||
|
kclp.ioHandler.writeError("Received shutdown action...")
|
||||||
|
|
||||||
|
// Shutdown should block until it's safe to shutdown the process
|
||||||
|
err = kclp.recordProcessor.Shutdown(action.Reason)
|
||||||
|
if err != nil { // Log error and continue shutting down
|
||||||
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
kclp.ioHandler.writeError("Reporting shutdown done")
|
||||||
|
return kclp.reportDone("shutdown")
|
||||||
|
case ActionInitialize:
|
||||||
|
err = kclp.recordProcessor.Initialize(action.ShardID, kclp)
|
||||||
|
if err == nil {
|
||||||
|
err = kclp.reportDone(action.Action)
|
||||||
|
}
|
||||||
|
case ActionProcessRecords:
|
||||||
|
err = kclp.recordProcessor.ProcessRecords(action.Records)
|
||||||
|
if err == nil {
|
||||||
|
err = kclp.reportDone(action.Action)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
err = fmt.Errorf("unknown action to dispatch: %+#v", action)
|
||||||
}
|
}
|
||||||
return kclp.reportDone(responseFor)
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kclp *KCLProcess) Run() {
|
func (kclp *KCLProcess) Run() {
|
||||||
for {
|
for {
|
||||||
line, err := kclp.ioHandler.readLine()
|
line, err := kclp.ioHandler.readLine()
|
||||||
if err != nil {
|
if err == io.EOF {
|
||||||
kclp.ioHandler.writeError("Read line error: " + err.Error())
|
kclp.ioHandler.writeError("IO stream closed")
|
||||||
return
|
return
|
||||||
} else if line == nil {
|
} else if err != nil {
|
||||||
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR Read line: %+#v", err))
|
||||||
|
return
|
||||||
|
} else if line == "" {
|
||||||
kclp.ioHandler.writeError("Empty read line recieved")
|
kclp.ioHandler.writeError("Empty read line recieved")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = kclp.handleLine(line)
|
||||||
|
if err != nil {
|
||||||
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = kclp.handleLine(line.String())
|
kclp.ckpmux.Lock()
|
||||||
if err != nil {
|
if !kclp.nextCheckpointPair.IsNil() {
|
||||||
kclp.ioHandler.writeError("Handle line error: " + err.Error())
|
seq := kclp.nextCheckpointPair.Sequence.String()
|
||||||
return
|
subSeq := kclp.nextCheckpointPair.SubSequence
|
||||||
|
|
||||||
|
err := kclp.sendCheckpoint(&seq, &subSeq)
|
||||||
|
if err != nil {
|
||||||
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err))
|
||||||
|
} else {
|
||||||
|
kclp.nextCheckpointPair = SequencePair{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
kclp.ckpmux.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
31
kcl/sequencepair.go
Normal file
31
kcl/sequencepair.go
Normal file
|
|
@ -0,0 +1,31 @@
|
||||||
|
package kcl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/big"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SequencePair a convience way to pass around a Sequence / SubSequence pair
|
||||||
|
type SequencePair struct {
|
||||||
|
Sequence *big.Int
|
||||||
|
SubSequence int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SequencePair) IsNil() bool {
|
||||||
|
return s.Sequence == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SequencePair) IsLessThan(pair SequencePair) bool {
|
||||||
|
if s.IsNil() || pair.IsNil() { // empty pairs are incomparable
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
cmp := s.Sequence.Cmp(pair.Sequence)
|
||||||
|
if cmp == -1 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if cmp == 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.SubSequence < pair.SubSequence
|
||||||
|
}
|
||||||
54
kcl/sequencepair_test.go
Normal file
54
kcl/sequencepair_test.go
Normal file
|
|
@ -0,0 +1,54 @@
|
||||||
|
package kcl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/big"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSequencePairIsLessThan(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
big10 := big.NewInt(10)
|
||||||
|
big5 := big.NewInt(5)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
left SequencePair
|
||||||
|
right SequencePair
|
||||||
|
isLess bool
|
||||||
|
}{
|
||||||
|
{left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false},
|
||||||
|
{left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false},
|
||||||
|
{left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false},
|
||||||
|
|
||||||
|
{left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true},
|
||||||
|
{left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true},
|
||||||
|
|
||||||
|
{left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false},
|
||||||
|
{left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
left := test.left
|
||||||
|
right := test.right
|
||||||
|
t.Logf(
|
||||||
|
"Is <%s, %d> less than <%s, %d>? %t",
|
||||||
|
left.Sequence.String(), left.SubSequence,
|
||||||
|
right.Sequence.String(), right.SubSequence,
|
||||||
|
test.isLess,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert.Equal(test.isLess, left.IsLessThan(right))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSequencePairEmpty(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
assert.True(SequencePair{nil, 0}.IsNil())
|
||||||
|
assert.True(SequencePair{nil, 10000}.IsNil())
|
||||||
|
|
||||||
|
assert.False(SequencePair{big.NewInt(10), 0}.IsNil())
|
||||||
|
assert.False(SequencePair{big.NewInt(0), 1000}.IsNil())
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue