From 04042290f5d684e5c4a5b23a3bf4d488ab00cf79 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Wed, 2 Aug 2017 19:45:23 +0000 Subject: [PATCH 01/28] Moved SequencePair to kcl package --- batchconsumer/batcher/message_batcher.go | 45 ++++---------- batchconsumer/batcher/message_batcher_test.go | 58 +++---------------- batchconsumer/writer.go | 22 +++---- kcl/sequencepair.go | 31 ++++++++++ kcl/sequencepair_test.go | 54 +++++++++++++++++ 5 files changed, 113 insertions(+), 97 deletions(-) create mode 100644 kcl/sequencepair.go create mode 100644 kcl/sequencepair_test.go diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go index 82245e8..0257fde 100644 --- a/batchconsumer/batcher/message_batcher.go +++ b/batchconsumer/batcher/message_batcher.go @@ -2,37 +2,12 @@ package batcher import ( "fmt" - "math/big" "sync" "time" + + "github.com/Clever/amazon-kinesis-client-go/kcl" ) -// SequencePair a convience way to pass around a Sequence / SubSequence pair -type SequencePair struct { - Sequence *big.Int - SubSequence int -} - -func (s SequencePair) IsEmpty() bool { - return s.Sequence == nil -} - -func (s SequencePair) IsLessThan(pair SequencePair) bool { - if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable - return false - } - - cmp := s.Sequence.Cmp(pair.Sequence) - if cmp == -1 { - return true - } - if cmp == 1 { - return false - } - - return s.SubSequence < pair.SubSequence -} - // Sync is used to allow a writer to syncronize with the batcher. // The writer declares how to write messages (via its `SendBatch` method), while the batcher // keeps track of messages written @@ -43,17 +18,17 @@ type Sync interface { // Batcher interface type Batcher interface { // AddMesage to the batch - AddMessage(msg []byte, sequencePair SequencePair) error + AddMessage(msg []byte, sequencePair kcl.SequencePair) error // Flush all messages from the batch Flush() // SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in // the current batch - SmallestSequencePair() SequencePair + SmallestSequencePair() kcl.SequencePair } type msgPack struct { msg []byte - sequencePair SequencePair + sequencePair kcl.SequencePair } type batcher struct { @@ -64,7 +39,7 @@ type batcher struct { flushSize int // smallestSeq are used for checkpointing - smallestSeq SequencePair + smallestSeq kcl.SequencePair sync Sync msgChan chan<- msgPack @@ -104,7 +79,7 @@ func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) return b, nil } -func (b *batcher) SmallestSequencePair() SequencePair { +func (b *batcher) SmallestSequencePair() kcl.SequencePair { b.mux.Lock() defer b.mux.Unlock() @@ -123,7 +98,7 @@ func (b *batcher) SetFlushSize(size int) { b.flushSize = size } -func (b *batcher) AddMessage(msg []byte, pair SequencePair) error { +func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error { if len(msg) <= 0 { return fmt.Errorf("Empty messages can't be sent") } @@ -135,7 +110,7 @@ func (b *batcher) AddMessage(msg []byte, pair SequencePair) error { // updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch. // When flush() is called, the batcher sends the sequence number to the writer. When the writer // checkpoints, it does so up to the latest message that was flushed successfully. -func (b *batcher) updateSequenceNumbers(pair SequencePair) { +func (b *batcher) updateSequenceNumbers(pair kcl.SequencePair) { b.mux.Lock() defer b.mux.Unlock() @@ -162,7 +137,7 @@ func (b *batcher) flush(batch [][]byte) [][]byte { b.sync.SendBatch(batch) b.mux.Lock() - b.smallestSeq = SequencePair{nil, 0} + b.smallestSeq = kcl.SequencePair{} b.mux.Unlock() } return [][]byte{} diff --git a/batchconsumer/batcher/message_batcher_test.go b/batchconsumer/batcher/message_batcher_test.go index 8ab9039..73d55fa 100644 --- a/batchconsumer/batcher/message_batcher_test.go +++ b/batchconsumer/batcher/message_batcher_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/Clever/amazon-kinesis-client-go/kcl" ) type batch [][]byte @@ -37,7 +39,7 @@ func (m *MockSync) waitForFlush(timeout time.Duration) error { } } -var mockSequence = SequencePair{big.NewInt(99999), 12345} +var mockSequence = kcl.SequencePair{big.NewInt(99999), 12345} func TestBatchingByCount(t *testing.T) { assert := assert.New(t) @@ -195,70 +197,24 @@ func TestUpdatingSequence(t *testing.T) { expected := new(big.Int) t.Log("After AddMessage (seq=1), smallestSeq = 1") - batcher.updateSequenceNumbers(SequencePair{big.NewInt(1), 1234}) + batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(1), 1234}) expected.SetInt64(1) seq := batcher.SmallestSequencePair() assert.True(expected.Cmp(seq.Sequence) == 0) t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher") - batcher.updateSequenceNumbers(SequencePair{big.NewInt(2), 1234}) + batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(2), 1234}) seq = batcher.SmallestSequencePair() assert.True(expected.Cmp(seq.Sequence) == 0) t.Log("After AddMessage (seq=1), smallestSeq = 0") - batcher.updateSequenceNumbers(SequencePair{big.NewInt(0), 1234}) + batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(0), 1234}) expected.SetInt64(0) seq = batcher.SmallestSequencePair() assert.True(expected.Cmp(seq.Sequence) == 0) t.Log("Flushing batch clears smallest sequence pair") - assert.NoError(batcher.AddMessage([]byte("cdcd"), SequencePair{big.NewInt(2), 1234})) + assert.NoError(batcher.AddMessage([]byte("cdcd"), kcl.SequencePair{big.NewInt(2), 1234})) sync.waitForFlush(time.Minute) assert.Nil(batcher.SmallestSequencePair().Sequence) } - -func TestSequencePairIsLessThan(t *testing.T) { - assert := assert.New(t) - - big10 := big.NewInt(10) - big5 := big.NewInt(5) - - tests := []struct { - left SequencePair - right SequencePair - isLess bool - }{ - {left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false}, - {left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false}, - {left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false}, - - {left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true}, - {left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true}, - - {left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false}, - {left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false}, - } - - for _, test := range tests { - left := test.left - right := test.right - t.Logf( - "Is <%s, %d> less than <%s, %d>? %t", - left.Sequence.String(), left.SubSequence, - right.Sequence.String(), right.SubSequence, - test.isLess, - ) - - assert.Equal(test.isLess, left.IsLessThan(right)) - } -} - -func TestSequencePairEmpty(t *testing.T) { - assert := assert.New(t) - - assert.True(SequencePair{nil, 0}.IsEmpty()) - assert.True(SequencePair{nil, 10000}.IsEmpty()) - - assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty()) - assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty()) -} diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index f641f62..7b99cd3 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -19,7 +19,7 @@ import ( type tagMsgPair struct { tag string msg []byte - pair batcher.SequencePair + pair kcl.SequencePair } type batchedWriter struct { @@ -29,16 +29,16 @@ type batchedWriter struct { shardID string - checkpointMsg chan batcher.SequencePair + checkpointMsg chan kcl.SequencePair checkpointTag chan string - lastProcessedPair chan batcher.SequencePair + lastProcessedPair chan kcl.SequencePair batchMsg chan tagMsgPair flushBatches chan struct{} // Limits the number of records read from the stream rateLimiter *rate.Limiter - lastProcessedSeq batcher.SequencePair + lastProcessedSeq kcl.SequencePair } func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter { @@ -53,13 +53,13 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID - b.checkpointMsg = make(chan batcher.SequencePair) + b.checkpointMsg = make(chan kcl.SequencePair) b.startCheckpointListener(checkpointer, b.checkpointMsg) b.checkpointTag = make(chan string) b.batchMsg = make(chan tagMsgPair) b.flushBatches = make(chan struct{}) - b.lastProcessedPair = make(chan batcher.SequencePair) + b.lastProcessedPair = make(chan kcl.SequencePair) b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches) return nil @@ -95,7 +95,7 @@ func (b *batchedWriter) handleCheckpointError(err error) bool { } func (b *batchedWriter) startCheckpointListener( - checkpointer kcl.Checkpointer, checkpointMsg <-chan batcher.SequencePair, + checkpointer kcl.Checkpointer, checkpointMsg <-chan kcl.SequencePair, ) { go func() { lastCheckpoint := time.Now() @@ -152,11 +152,11 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { // startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a // go routine to avoid racey conditions. func (b *batchedWriter) startMessageHandler( - batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan batcher.SequencePair, + batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan kcl.SequencePair, flushBatches <-chan struct{}, ) { go func() { - var lastProcessedPair batcher.SequencePair + var lastProcessedPair kcl.SequencePair batchers := map[string]batcher.Batcher{} areBatchersEmpty := true @@ -231,7 +231,7 @@ func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) } func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { - var pair batcher.SequencePair + var pair kcl.SequencePair prevPair := b.lastProcessedSeq for _, record := range records { @@ -243,7 +243,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber) } - pair = batcher.SequencePair{seq, record.SubSequenceNumber} + pair = kcl.SequencePair{seq, record.SubSequenceNumber} if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty prevPair = pair } diff --git a/kcl/sequencepair.go b/kcl/sequencepair.go new file mode 100644 index 0000000..df0fbfe --- /dev/null +++ b/kcl/sequencepair.go @@ -0,0 +1,31 @@ +package kcl + +import ( + "math/big" +) + +// SequencePair a convience way to pass around a Sequence / SubSequence pair +type SequencePair struct { + Sequence *big.Int + SubSequence int +} + +func (s SequencePair) IsEmpty() bool { + return s.Sequence == nil +} + +func (s SequencePair) IsLessThan(pair SequencePair) bool { + if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable + return false + } + + cmp := s.Sequence.Cmp(pair.Sequence) + if cmp == -1 { + return true + } + if cmp == 1 { + return false + } + + return s.SubSequence < pair.SubSequence +} diff --git a/kcl/sequencepair_test.go b/kcl/sequencepair_test.go new file mode 100644 index 0000000..58d253f --- /dev/null +++ b/kcl/sequencepair_test.go @@ -0,0 +1,54 @@ +package kcl + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSequencePairIsLessThan(t *testing.T) { + assert := assert.New(t) + + big10 := big.NewInt(10) + big5 := big.NewInt(5) + + tests := []struct { + left SequencePair + right SequencePair + isLess bool + }{ + {left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false}, + {left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false}, + {left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false}, + + {left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true}, + {left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true}, + + {left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false}, + {left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false}, + } + + for _, test := range tests { + left := test.left + right := test.right + t.Logf( + "Is <%s, %d> less than <%s, %d>? %t", + left.Sequence.String(), left.SubSequence, + right.Sequence.String(), right.SubSequence, + test.isLess, + ) + + assert.Equal(test.isLess, left.IsLessThan(right)) + } +} + +func TestSequencePairEmpty(t *testing.T) { + assert := assert.New(t) + + assert.True(SequencePair{nil, 0}.IsEmpty()) + assert.True(SequencePair{nil, 10000}.IsEmpty()) + + assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty()) + assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty()) +} From 7a7716e8243e184501ab5ad72463a25dd67a19e2 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Wed, 2 Aug 2017 19:46:13 +0000 Subject: [PATCH 02/28] Increased default read rate limit --- batchconsumer/consumer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index f43ef05..35f4ddc 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -63,8 +63,9 @@ func withDefaults(config Config) Config { config.DeployEnv = "unknown-env" } + // Not totally clear we need this rate limit. The KCL may do rate limiting for us. if config.ReadRateLimit == 0 { - config.ReadRateLimit = 300 + config.ReadRateLimit = 1000 } if config.ReadBurstLimit == 0 { config.ReadBurstLimit = int(float64(config.ReadRateLimit)*1.2 + 0.5) From 266d7d620d43dffbb9d58f1af237f4e8b4e68b87 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Wed, 2 Aug 2017 19:47:14 +0000 Subject: [PATCH 03/28] Fixed bug in decoder package --- decode/decode.go | 9 +++++++-- decode/decode_test.go | 10 +++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/decode/decode.go b/decode/decode.go index cfacd9d..0a89db6 100644 --- a/decode/decode.go +++ b/decode/decode.go @@ -319,9 +319,14 @@ func ExtractKVMeta(kvlog map[string]interface{}) KVMeta { tmp, ok = kvmeta["routes"] if ok { - routes, ok := tmp.([]map[string]interface{}) + routes, ok := tmp.([]interface{}) if ok { - kvRoutes = routes + for _, route := range routes { + rule, ok := route.(map[string]interface{}) + if ok { // TODO: log error + kvRoutes = append(kvRoutes, rule) + } + } } } diff --git a/decode/decode_test.go b/decode/decode_test.go index 9f7a123..4ec3ab6 100644 --- a/decode/decode_test.go +++ b/decode/decode_test.go @@ -551,7 +551,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "green", "kv_version": "three", "kv_language": "tree", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "metrics", "rule": "cool", @@ -590,7 +590,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "green", "kv_version": "christmas", "kv_language": "tree", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "analytics", "rule": "what's-this?", @@ -632,7 +632,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "slack", "kv_version": "evergreen", "kv_language": "markdown-ish", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "notifications", "rule": "did-you-know", @@ -678,7 +678,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "a-team", "kv_version": "old", "kv_language": "jive", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "alerts", "rule": "last-call", @@ -740,7 +740,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "diversity", "kv_version": "kv-routes", "kv_language": "understanding", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "metrics", "rule": "all-combos", From 873544ae78f83718e06dd5bd3efdc9034f39d2cc Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Wed, 2 Aug 2017 19:54:06 +0000 Subject: [PATCH 04/28] Run benchmarks when building --- circle.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/circle.yml b/circle.yml index 440a5a7..52271b9 100644 --- a/circle.yml +++ b/circle.yml @@ -11,6 +11,7 @@ compile: override: - make install_deps - make build + - make bench test: override: - make test From 6e9457cbcfac5d74f91c503de4d1f523cf84ffab Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 3 Aug 2017 07:55:57 +0000 Subject: [PATCH 05/28] Better propogated shutdown signal --- batchconsumer/writer.go | 49 +++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 7b99cd3..1dc0d09 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -29,11 +29,12 @@ type batchedWriter struct { shardID string - checkpointMsg chan kcl.SequencePair - checkpointTag chan string - lastProcessedPair chan kcl.SequencePair - batchMsg chan tagMsgPair - flushBatches chan struct{} + checkpointMsg chan kcl.SequencePair + checkpointShutdown chan struct{} + checkpointTag chan string + lastProcessedPair chan kcl.SequencePair + batchMsg chan tagMsgPair + shutdown chan struct{} // Limits the number of records read from the stream rateLimiter *rate.Limiter @@ -54,13 +55,14 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID b.checkpointMsg = make(chan kcl.SequencePair) - b.startCheckpointListener(checkpointer, b.checkpointMsg) + b.checkpointShutdown = make(chan struct{}) + b.startCheckpointListener(checkpointer, b.checkpointMsg, b.checkpointShutdown) b.checkpointTag = make(chan string) b.batchMsg = make(chan tagMsgPair) - b.flushBatches = make(chan struct{}) + b.shutdown = make(chan struct{}) b.lastProcessedPair = make(chan kcl.SequencePair) - b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches) + b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.shutdown) return nil } @@ -96,26 +98,35 @@ func (b *batchedWriter) handleCheckpointError(err error) bool { func (b *batchedWriter) startCheckpointListener( checkpointer kcl.Checkpointer, checkpointMsg <-chan kcl.SequencePair, + shutdown <-chan struct{}, ) { go func() { lastCheckpoint := time.Now() for { - seq := <-checkpointMsg + seq := kcl.SequencePair{} + isShuttingDown := false + + select { + case seq = <-checkpointMsg: + case <-shutdown: + isShuttingDown = true + } // This is a write throttle to ensure we don't checkpoint faster than // b.config.CheckpointFreq. The latest seq number is always used. - for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { + for !isShuttingDown && time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { select { case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting + case <-shutdown: + isShuttingDown = true case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C: } } retry := true - for n := 0; retry && n < b.config.CheckpointRetries+1; n++ { - str := seq.Sequence.String() - err := checkpointer.Checkpoint(&str, &seq.SubSequence) + for n := 0; retry && !seq.IsEmpty() && n < b.config.CheckpointRetries+1; n++ { + err := checkpointer.Checkpoint(seq, 5) if err == nil { // Successfully checkpointed! lastCheckpoint = time.Now() break @@ -132,6 +143,11 @@ func (b *batchedWriter) startCheckpointListener( time.Sleep(b.config.CheckpointRetrySleep) } } + + if isShuttingDown { + checkpointer.Shutdown() + return + } } }() } @@ -153,7 +169,7 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { // go routine to avoid racey conditions. func (b *batchedWriter) startMessageHandler( batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan kcl.SequencePair, - flushBatches <-chan struct{}, + shutdown <-chan struct{}, ) { go func() { var lastProcessedPair kcl.SequencePair @@ -206,11 +222,12 @@ func (b *batchedWriter) startMessageHandler( b.checkpointMsg <- pair } lastProcessedPair = pair - case <-flushBatches: + case <-shutdown: for _, batch := range batchers { batch.Flush() } b.checkpointMsg <- lastProcessedPair + b.checkpointShutdown <- struct{}{} areBatchersEmpty = true } } @@ -317,9 +334,9 @@ func (b *batchedWriter) SendBatch(batch [][]byte, tag string) { func (b *batchedWriter) Shutdown(reason string) error { if reason == "TERMINATE" { b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) - b.flushBatches <- struct{}{} } else { b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } + b.shutdown <- struct{}{} return nil } From c6fe4cef37abab244029a2b18d46b6f15f28a7fa Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 3 Aug 2017 18:33:43 +0000 Subject: [PATCH 06/28] Renamed variable from seq to pair --- batchconsumer/writer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 1dc0d09..4260025 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -104,20 +104,20 @@ func (b *batchedWriter) startCheckpointListener( lastCheckpoint := time.Now() for { - seq := kcl.SequencePair{} + pair := kcl.SequencePair{} isShuttingDown := false select { - case seq = <-checkpointMsg: + case pair = <-checkpointMsg: case <-shutdown: isShuttingDown = true } // This is a write throttle to ensure we don't checkpoint faster than - // b.config.CheckpointFreq. The latest seq number is always used. + // b.config.CheckpointFreq. The latest pair number is always used. for !isShuttingDown && time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { select { - case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting + case pair = <-checkpointMsg: // Keep updating checkpoint pair while waiting case <-shutdown: isShuttingDown = true case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C: @@ -125,7 +125,7 @@ func (b *batchedWriter) startCheckpointListener( } retry := true - for n := 0; retry && !seq.IsEmpty() && n < b.config.CheckpointRetries+1; n++ { + for n := 0; retry && !pair.IsEmpty() && n < b.config.CheckpointRetries+1; n++ { err := checkpointer.Checkpoint(seq, 5) if err == nil { // Successfully checkpointed! lastCheckpoint = time.Now() From 29f68f77eb5f6a71e43e7bc66cf32e5dd44d55d5 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 3 Aug 2017 18:35:19 +0000 Subject: [PATCH 07/28] Removed redundant retry logic --- batchconsumer/writer.go | 50 +++++------------------------------------ 1 file changed, 5 insertions(+), 45 deletions(-) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 4260025..da07f65 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -67,35 +67,6 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer return nil } -// handleCheckpointError returns true if checkout should be tried again. Returns false otherwise. -func (b *batchedWriter) handleCheckpointError(err error) bool { - if err == nil { - return false - } - - cperr, ok := err.(kcl.CheckpointError) - if !ok { - b.log.ErrorD("unknown-checkpoint-error", kv.M{"msg": err.Error(), "shard-id": b.shardID}) - return true - } - - switch cperr.Error() { - case "ShutdownException": // Skips checkpointing - b.log.ErrorD("shutdown-checkpoint-exception", kv.M{ - "msg": err.Error(), "shard-id": b.shardID, - }) - return false - case "ThrottlingException": - b.log.ErrorD("checkpoint-throttle", kv.M{"shard-id": b.shardID}) - case "InvalidStateException": - b.log.ErrorD("invalid-checkpoint-state", kv.M{"shard-id": b.shardID}) - default: - b.log.ErrorD("checkpoint-error", kv.M{"shard-id": b.shardID, "msg": err}) - } - - return true -} - func (b *batchedWriter) startCheckpointListener( checkpointer kcl.Checkpointer, checkpointMsg <-chan kcl.SequencePair, shutdown <-chan struct{}, @@ -124,23 +95,12 @@ func (b *batchedWriter) startCheckpointListener( } } - retry := true - for n := 0; retry && !pair.IsEmpty() && n < b.config.CheckpointRetries+1; n++ { - err := checkpointer.Checkpoint(seq, 5) - if err == nil { // Successfully checkpointed! + if !pair.IsEmpty() { + err := checkpointer.Checkpoint(pair, b.config.CheckpointRetries) + if err != nil { + b.log.ErrorD("checkpoint-err", kv.M{"msg": err.Error(), "shard-id": b.shardID}) + } else { lastCheckpoint = time.Now() - break - } - - retry = b.handleCheckpointError(err) - - if n == b.config.CheckpointRetries { - b.log.ErrorD("checkpoint-retries", kv.M{"attempts": b.config.CheckpointRetries}) - retry = false - } - - if retry { - time.Sleep(b.config.CheckpointRetrySleep) } } From ba951ff0dab7407a8abed2a9fdb15dbe105b1727 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 3 Aug 2017 21:22:52 +0000 Subject: [PATCH 08/28] Refactor to fix dead locks and race conditions. --- batchconsumer/writer.go | 66 ++++++--- batchconsumer/writer_test.go | 35 ++--- cmd/consumer/main.go | 17 +-- kcl/kcl.go | 256 +++++++++++++++++++---------------- 4 files changed, 211 insertions(+), 163 deletions(-) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index da07f65..bb8e2e6 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -32,7 +32,7 @@ type batchedWriter struct { checkpointMsg chan kcl.SequencePair checkpointShutdown chan struct{} checkpointTag chan string - lastProcessedPair chan kcl.SequencePair + lastIgnoredPair chan kcl.SequencePair batchMsg chan tagMsgPair shutdown chan struct{} @@ -58,11 +58,11 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer b.checkpointShutdown = make(chan struct{}) b.startCheckpointListener(checkpointer, b.checkpointMsg, b.checkpointShutdown) - b.checkpointTag = make(chan string) + b.checkpointTag = make(chan string, 100) // Buffered to workaround b.batchMsg = make(chan tagMsgPair) b.shutdown = make(chan struct{}) - b.lastProcessedPair = make(chan kcl.SequencePair) - b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.shutdown) + b.lastIgnoredPair = make(chan kcl.SequencePair) + b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastIgnoredPair, b.shutdown) return nil } @@ -128,32 +128,48 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { // startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a // go routine to avoid racey conditions. func (b *batchedWriter) startMessageHandler( - batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan kcl.SequencePair, + batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastIgnored <-chan kcl.SequencePair, shutdown <-chan struct{}, ) { - go func() { - var lastProcessedPair kcl.SequencePair - batchers := map[string]batcher.Batcher{} - areBatchersEmpty := true + getBatcher := make(chan string) + rtnBatcher := make(chan batcher.Batcher) + shutdownAdder := make(chan struct{}) + go func() { for { select { case tmp := <-batchMsg: - batcher, ok := batchers[tmp.tag] - if !ok { - batcher = b.createBatcher(tmp.tag) - batchers[tmp.tag] = batcher - } - + getBatcher <- tmp.tag + batcher := <-rtnBatcher err := batcher.AddMessage(tmp.msg, tmp.pair) if err != nil { b.log.ErrorD("add-message", kv.M{ "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, }) } + case <-shutdownAdder: + } + } + }() + + go func() { + var lastIgnoredPair kcl.SequencePair + batchers := map[string]batcher.Batcher{} + areBatchersEmpty := true + + for { + select { + case tag := <-getBatcher: + batcher, ok := batchers[tag] + if !ok { + batcher = b.createBatcher(tag) + batchers[tag] = batcher + } + areBatchersEmpty = false + rtnBatcher <- batcher case tag := <-checkpointTag: - smallest := lastProcessedPair + smallest := lastIgnoredPair isAllEmpty := true for name, batch := range batchers { @@ -166,7 +182,8 @@ func (b *batchedWriter) startMessageHandler( continue } - if pair.IsLessThan(smallest) { + // Check for empty because it's possible that no messages have been ignored + if smallest.IsEmpty() || pair.IsLessThan(smallest) { smallest = pair } @@ -177,17 +194,18 @@ func (b *batchedWriter) startMessageHandler( b.checkpointMsg <- smallest } areBatchersEmpty = isAllEmpty - case pair := <-lastPair: - if areBatchersEmpty { + case pair := <-lastIgnored: + if areBatchersEmpty && !pair.IsEmpty() { b.checkpointMsg <- pair } - lastProcessedPair = pair + lastIgnoredPair = pair case <-shutdown: for _, batch := range batchers { batch.Flush() } - b.checkpointMsg <- lastProcessedPair + b.checkpointMsg <- b.lastProcessedSeq b.checkpointShutdown <- struct{}{} + areBatchersEmpty = true } } @@ -234,6 +252,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { if err != nil { return err } + wasPairIgnored := true for _, rawmsg := range messages { msg, tags, err := b.sender.ProcessMessage(rawmsg) @@ -260,11 +279,14 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { // sequence number amount all the batch (let's call it A). We then checkpoint at // the A-1 sequence number. b.batchMsg <- tagMsgPair{tag, msg, prevPair} + wasPairIgnored = false } } prevPair = pair - b.lastProcessedPair <- pair + if wasPairIgnored { + b.lastIgnoredPair <- pair + } } b.lastProcessedSeq = pair diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 6622937..d87d1ce 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -86,30 +86,28 @@ type mockCheckpointer struct { shutdown chan struct{} } -func NewMockCheckpointer(maxSeq string, timeout time.Duration) *mockCheckpointer { +func NewMockCheckpointer(timeout time.Duration) *mockCheckpointer { mcp := &mockCheckpointer{ checkpoint: make(chan string), done: make(chan struct{}, 1), timeout: make(chan struct{}, 1), shutdown: make(chan struct{}), } - mcp.startWaiter(maxSeq, timeout) + mcp.startWaiter(timeout) return mcp } -func (m *mockCheckpointer) startWaiter(maxSeq string, timeout time.Duration) { +func (m *mockCheckpointer) startWaiter(timeout time.Duration) { go func() { for { select { case seq := <-m.checkpoint: m.recievedSequences = append(m.recievedSequences, seq) - if seq == maxSeq { - m.done <- struct{}{} - } case <-time.NewTimer(timeout).C: m.timeout <- struct{}{} case <-m.shutdown: + m.done <- struct{}{} return } } @@ -126,15 +124,10 @@ func (m *mockCheckpointer) wait() error { func (m *mockCheckpointer) Shutdown() { m.shutdown <- struct{}{} } -func (m *mockCheckpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { - m.checkpoint <- *sequenceNumber +func (m *mockCheckpointer) Checkpoint(pair kcl.SequencePair, retry int) error { + m.checkpoint <- pair.Sequence.String() return nil } -func (m *mockCheckpointer) CheckpointWithRetry( - sequenceNumber *string, subSequenceNumber *int, retryCount int, -) error { - return m.Checkpoint(sequenceNumber, subSequenceNumber) -} func encode(str string) string { return base64.StdEncoding.EncodeToString([]byte(str)) @@ -148,7 +141,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) { BatchInterval: 10 * time.Millisecond, CheckpointFreq: 20 * time.Millisecond, }) - mockcheckpointer := NewMockCheckpointer("4", 5*time.Second) + mockcheckpointer := NewMockCheckpointer(5 * time.Second) wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog) wrt.Initialize("test-shard", mockcheckpointer) @@ -161,8 +154,13 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) { }) assert.NoError(err) + err = wrt.Shutdown("TERMINATE") + assert.NoError(err) + err = mockcheckpointer.wait() assert.NoError(err) + + assert.Contains(mockcheckpointer.recievedSequences, "4") } func TestProcessRecordsMutliBatchBasic(t *testing.T) { @@ -173,7 +171,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, }) - mockcheckpointer := NewMockCheckpointer("8", 5*time.Second) + mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) @@ -233,7 +231,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, }) - mockcheckpointer := NewMockCheckpointer("26", 5*time.Second) + mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) @@ -312,7 +310,7 @@ func TestStaggeredCheckpionting(t *testing.T) { BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Nanosecond, }) - mockcheckpointer := NewMockCheckpointer("9", 5*time.Second) + mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) @@ -352,6 +350,7 @@ func TestStaggeredCheckpionting(t *testing.T) { assert.NotContains(mockcheckpointer.recievedSequences, "6") assert.NotContains(mockcheckpointer.recievedSequences, "7") assert.NotContains(mockcheckpointer.recievedSequences, "8") + assert.Contains(mockcheckpointer.recievedSequences, "9") assert.Contains(mocksender.batches, "tag1") assert.Equal(2, len(mocksender.batches["tag1"])) // One batch @@ -365,8 +364,10 @@ func TestStaggeredCheckpionting(t *testing.T) { assert.Equal(2, len(mocksender.batches["tag3"][0])) // with three items assert.Equal("tag3", string(mocksender.batches["tag3"][0][0])) assert.Equal("tag3", string(mocksender.batches["tag3"][0][1])) + assert.Equal(2, len(mocksender.batches["tag3"][1])) assert.Equal("tag3", string(mocksender.batches["tag3"][1][0])) assert.Equal("tag3", string(mocksender.batches["tag3"][1][1])) + assert.Equal(2, len(mocksender.batches["tag3"][2])) assert.Equal("tag3", string(mocksender.batches["tag3"][2][0])) assert.Equal("tag3", string(mocksender.batches["tag3"][2][1])) } diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 02281a9..8e51ee9 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -13,8 +13,7 @@ type sampleRecordProcessor struct { checkpointer kcl.Checkpointer checkpointRetries int checkpointFreq time.Duration - largestSeq *big.Int - largestSubSeq int + largestPair kcl.SequencePair lastCheckpoint time.Time } @@ -31,9 +30,8 @@ func (srp *sampleRecordProcessor) Initialize(shardID string, checkpointer kcl.Ch return nil } -func (srp *sampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool { - return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 || - (sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq) +func (srp *sampleRecordProcessor) shouldUpdateSequence(pair kcl.SequencePair) bool { + return srp.largestPair.IsLessThan(pair) } func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { @@ -43,14 +41,13 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { fmt.Fprintf(os.Stderr, "could not parse sequence number '%s'\n", record.SequenceNumber) continue } - if srp.shouldUpdateSequence(seqNumber, record.SubSequenceNumber) { - srp.largestSeq = seqNumber - srp.largestSubSeq = record.SubSequenceNumber + pair := kcl.SequencePair{seqNumber, record.SubSequenceNumber} + if srp.shouldUpdateSequence(pair) { + srp.largestPair = pair } } if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq { - largestSeq := srp.largestSeq.String() - srp.checkpointer.CheckpointWithRetry(&largestSeq, &srp.largestSubSeq, srp.checkpointRetries) + srp.checkpointer.Checkpoint(srp.largestPair, srp.checkpointRetries) srp.lastCheckpoint = time.Now() } return nil diff --git a/kcl/kcl.go b/kcl/kcl.go index 950a92c..76f6c13 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -2,12 +2,10 @@ package kcl import ( "bufio" - "bytes" "encoding/json" "fmt" "io" "os" - "sync" "time" ) @@ -18,8 +16,7 @@ type RecordProcessor interface { } type Checkpointer interface { - Checkpoint(sequenceNumber *string, subSequenceNumber *int) error - CheckpointWithRetry(sequenceNumber *string, subSequenceNumber *int, retryCount int) error + Checkpoint(pair SequencePair, retryCount int) error Shutdown() } @@ -31,93 +28,6 @@ func (ce CheckpointError) Error() string { return ce.e } -type checkpointer struct { - mux sync.Mutex - - ioHandler ioHandler -} - -func (c *checkpointer) getAction() (interface{}, error) { - line, err := c.ioHandler.readLine() - if err != nil { - return nil, err - } - action, err := c.ioHandler.loadAction(line.String()) - if err != nil { - return nil, err - } - return action, nil -} - -func (c *checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { - c.mux.Lock() - defer c.mux.Unlock() - - c.ioHandler.writeAction(ActionCheckpoint{ - Action: "checkpoint", - SequenceNumber: sequenceNumber, - SubSequenceNumber: subSequenceNumber, - }) - line, err := c.ioHandler.readLine() - if err != nil { - return err - } - actionI, err := c.ioHandler.loadAction(line.String()) - if err != nil { - return err - } - action, ok := actionI.(ActionCheckpoint) - if !ok { - return fmt.Errorf("expected checkpoint response, got '%s'", line.String()) - } - if action.Error != nil && *action.Error != "" { - return CheckpointError{ - e: *action.Error, - } - } - return nil -} - -// CheckpointWithRetry tries to save a checkPoint up to `retryCount` + 1 times. -// `retryCount` should be >= 0 -func (c *checkpointer) CheckpointWithRetry( - sequenceNumber *string, subSequenceNumber *int, retryCount int, -) error { - sleepDuration := 5 * time.Second - - for n := 0; n <= retryCount; n++ { - err := c.Checkpoint(sequenceNumber, subSequenceNumber) - if err == nil { - return nil - } - - if cperr, ok := err.(CheckpointError); ok { - switch cperr.Error() { - case "ShutdownException": - return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") - case "ThrottlingException": - fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s\n", sleepDuration) - case "InvalidStateException": - fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n") - default: - fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) - } - } - - if n == retryCount { - return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount) - } - - time.Sleep(sleepDuration) - } - - return nil -} - -func (c *checkpointer) Shutdown() { - c.CheckpointWithRetry(nil, nil, 5) -} - type ioHandler struct { inputFile io.Reader outputFile io.Writer @@ -134,13 +44,13 @@ func (i ioHandler) writeError(message string) { fmt.Fprintf(i.errorFile, "%s\n", message) } -func (i ioHandler) readLine() (*bytes.Buffer, error) { +func (i ioHandler) readLine() (string, error) { bio := bufio.NewReader(i.inputFile) line, err := bio.ReadString('\n') if err != nil { - return nil, err + return "", err } - return bytes.NewBufferString(line), nil + return line, nil } type ActionInitialize struct { @@ -197,6 +107,8 @@ func (i ioHandler) loadAction(line string) (interface{}, error) { return nil, err } return actionProcessRecords, nil + case "shutdownRequested": + fallthrough case "shutdown": var actionShutdown ActionShutdown if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil { @@ -223,25 +135,37 @@ func (i ioHandler) writeAction(action interface{}) error { return nil } -func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor) *KCLProcess { +func New( + inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor, +) *KCLProcess { i := ioHandler{ inputFile: inputFile, outputFile: outputFile, errorFile: errorFile, } return &KCLProcess{ - ioHandler: i, - checkpointer: &checkpointer{ - ioHandler: i, - }, + ioHandler: i, recordProcessor: recordProcessor, + + next: make(chan struct{}), + out: make(chan string), + outErr: make(chan error), + + checkpoint: make(chan SequencePair), + checkpointErr: make(chan error), } } type KCLProcess struct { ioHandler ioHandler - checkpointer Checkpointer recordProcessor RecordProcessor + + next chan struct{} + out chan string + outErr chan error + + checkpoint chan SequencePair + checkpointErr chan error } func (kclp *KCLProcess) reportDone(responseFor string) error { @@ -257,13 +181,13 @@ func (kclp *KCLProcess) reportDone(responseFor string) error { func (kclp *KCLProcess) performAction(a interface{}) (string, error) { switch action := a.(type) { case ActionInitialize: - return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp.checkpointer) + return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp) case ActionProcessRecords: return action.Action, kclp.recordProcessor.ProcessRecords(action.Records) case ActionShutdown: return action.Action, kclp.recordProcessor.Shutdown(action.Reason) default: - return "", fmt.Errorf("unknown action to dispatch: %s", action) + return "", fmt.Errorf("unknown action to dispatch: %+#v", action) } } @@ -280,20 +204,124 @@ func (kclp *KCLProcess) handleLine(line string) error { return kclp.reportDone(responseFor) } -func (kclp *KCLProcess) Run() { - for { - line, err := kclp.ioHandler.readLine() - if err != nil { - kclp.ioHandler.writeError("Read line error: " + err.Error()) - return - } else if line == nil { - kclp.ioHandler.writeError("Empty read line recieved") - return +func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { + sleepDuration := 5 * time.Second + + for n := 0; n <= retryCount; n++ { + kclp.checkpoint <- pair + err := <-kclp.checkpointErr + if err == nil { + return nil } - err = kclp.handleLine(line.String()) - if err != nil { - kclp.ioHandler.writeError("Handle line error: " + err.Error()) + if cperr, ok := err.(CheckpointError); ok { + switch cperr.Error() { + case "ShutdownException": + return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") + case "ThrottlingException": + fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s\n", sleepDuration) + case "InvalidStateException": + fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing\n") + default: + fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) + } + } + + if n == retryCount { + return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount) + } + + time.Sleep(sleepDuration) + } + + return nil +} + +func (kclp *KCLProcess) Shutdown() { + kclp.Checkpoint(SequencePair{}, 5) +} + +func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { + var seq *string + var subSeq *int + if !pair.IsEmpty() { // an empty pair is a signal to shutdown + tmp := pair.Sequence.String() + seq = &tmp + subSeq = &pair.SubSequence + } + kclp.ioHandler.writeAction(ActionCheckpoint{ + Action: "checkpoint", + SequenceNumber: seq, + SubSequenceNumber: subSeq, + }) + line, err := kclp.ioHandler.readLine() + if err != nil { + return err + } + actionI, err := kclp.ioHandler.loadAction(line) + if err != nil { + return err + } + action, ok := actionI.(ActionCheckpoint) + if !ok { + return fmt.Errorf("expected checkpoint response, got '%s'", line) + } + if action.Error != nil && *action.Error != "" { + return CheckpointError{e: *action.Error} + } + return nil +} + +func (kclp *KCLProcess) startLineProcessor( + next chan struct{}, out chan string, outErr chan error, + checkpoint chan SequencePair, checkpointErr chan error, +) { + go func() { + for { + select { + case <-next: + line, err := kclp.ioHandler.readLine() + if err != nil { + outErr <- err + } else { + out <- line + } + case pair := <-checkpoint: + err := kclp.processCheckpoint(pair) + checkpointErr <- err + } + } + }() +} + +func (kclp *KCLProcess) processNextLine() error { + kclp.next <- struct{}{} // We're ready for a new line + + var err error + var line string + + select { + case err = <-kclp.outErr: + case line = <-kclp.out: + if line == "" { + err = fmt.Errorf("Empty read line recieved") + } else { + err = kclp.handleLine(line) + } + } + + return err +} + +func (kclp *KCLProcess) Run() { + kclp.startLineProcessor(kclp.next, kclp.out, kclp.outErr, kclp.checkpoint, kclp.checkpointErr) + for { + err := kclp.processNextLine() + if err == io.EOF { + kclp.ioHandler.writeError("IO stream closed") + return + } else if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return } } From 6a30e0eb8ac93e181dc3961426be4a9d9da733b6 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Fri, 4 Aug 2017 09:36:42 +0000 Subject: [PATCH 09/28] Huge refactor. Batchers no longer run in their own go-routines, which allowed us to a lot of concurrency and simplify model. --- batchconsumer/batcher.go | 53 +++++ batchconsumer/batcher/message_batcher.go | 169 -------------- batchconsumer/batcher/message_batcher_test.go | 220 ------------------ batchconsumer/batchermanager.go | 196 ++++++++++++++++ batchconsumer/checkpointmanager.go | 89 +++++++ batchconsumer/consumer.go | 1 + batchconsumer/sync.go | 10 - batchconsumer/writer.go | 201 +--------------- batchconsumer/writer_test.go | 5 - 9 files changed, 349 insertions(+), 595 deletions(-) create mode 100644 batchconsumer/batcher.go delete mode 100644 batchconsumer/batcher/message_batcher.go delete mode 100644 batchconsumer/batcher/message_batcher_test.go create mode 100644 batchconsumer/batchermanager.go create mode 100644 batchconsumer/checkpointmanager.go delete mode 100644 batchconsumer/sync.go diff --git a/batchconsumer/batcher.go b/batchconsumer/batcher.go new file mode 100644 index 0000000..83d3a47 --- /dev/null +++ b/batchconsumer/batcher.go @@ -0,0 +1,53 @@ +package batchconsumer + +import ( + "fmt" + "time" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +var ErrBatchFull = fmt.Errorf("The batch is full") + +type batcher struct { + flushCount int + flushSize int + + Batch [][]byte + LastUpdated time.Time + SmallestSeq kcl.SequencePair +} + +func (b *batcher) batchSize(batch [][]byte) int { + total := 0 + for _, msg := range batch { + total += len(msg) + } + + return total +} + +func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error { + if b.flushCount <= len(b.Batch) { + return ErrBatchFull + } + + size := b.batchSize(b.Batch) + if b.flushSize < size+len(msg) { + return ErrBatchFull + } + + b.Batch = append(b.Batch, msg) + if b.SmallestSeq.IsEmpty() || pair.IsLessThan(b.SmallestSeq) { + b.SmallestSeq = pair + } + b.LastUpdated = time.Now() + + return nil +} + +func (b *batcher) Clear() { + b.Batch = [][]byte{} + b.LastUpdated = time.Time{} + b.SmallestSeq = kcl.SequencePair{} +} diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go deleted file mode 100644 index 0257fde..0000000 --- a/batchconsumer/batcher/message_batcher.go +++ /dev/null @@ -1,169 +0,0 @@ -package batcher - -import ( - "fmt" - "sync" - "time" - - "github.com/Clever/amazon-kinesis-client-go/kcl" -) - -// Sync is used to allow a writer to syncronize with the batcher. -// The writer declares how to write messages (via its `SendBatch` method), while the batcher -// keeps track of messages written -type Sync interface { - SendBatch(batch [][]byte) -} - -// Batcher interface -type Batcher interface { - // AddMesage to the batch - AddMessage(msg []byte, sequencePair kcl.SequencePair) error - // Flush all messages from the batch - Flush() - // SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in - // the current batch - SmallestSequencePair() kcl.SequencePair -} - -type msgPack struct { - msg []byte - sequencePair kcl.SequencePair -} - -type batcher struct { - mux sync.Mutex - - flushInterval time.Duration - flushCount int - flushSize int - - // smallestSeq are used for checkpointing - smallestSeq kcl.SequencePair - - sync Sync - msgChan chan<- msgPack - flushChan chan<- struct{} -} - -// New creates a new Batcher -// - sync - synchronizes batcher with writer -// - flushInterval - how often accumulated messages should be flushed (default 1 second). -// - flushCount - number of messages that trigger a flush (default 10). -// - flushSize - size of batch that triggers a flush (default 1024 * 1024 = 1 mb) -func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) (Batcher, error) { - if flushSize == 0 { - return nil, fmt.Errorf("flush size must be non-zero") - } - if flushCount == 0 { - return nil, fmt.Errorf("flush count must be non-zero") - } - if flushInterval == 0 { - return nil, fmt.Errorf("flush interval must be non-zero") - } - - msgChan := make(chan msgPack) - flushChan := make(chan struct{}) - - b := &batcher{ - flushCount: flushCount, - flushInterval: flushInterval, - flushSize: flushSize, - sync: sync, - msgChan: msgChan, - flushChan: flushChan, - } - - go b.startBatcher(msgChan, flushChan) - - return b, nil -} - -func (b *batcher) SmallestSequencePair() kcl.SequencePair { - b.mux.Lock() - defer b.mux.Unlock() - - return b.smallestSeq -} - -func (b *batcher) SetFlushInterval(dur time.Duration) { - b.flushInterval = dur -} - -func (b *batcher) SetFlushCount(count int) { - b.flushCount = count -} - -func (b *batcher) SetFlushSize(size int) { - b.flushSize = size -} - -func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error { - if len(msg) <= 0 { - return fmt.Errorf("Empty messages can't be sent") - } - - b.msgChan <- msgPack{msg, pair} - return nil -} - -// updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch. -// When flush() is called, the batcher sends the sequence number to the writer. When the writer -// checkpoints, it does so up to the latest message that was flushed successfully. -func (b *batcher) updateSequenceNumbers(pair kcl.SequencePair) { - b.mux.Lock() - defer b.mux.Unlock() - - if b.smallestSeq.IsEmpty() || pair.IsLessThan(b.smallestSeq) { - b.smallestSeq = pair - } -} - -func (b *batcher) Flush() { - b.flushChan <- struct{}{} -} - -func (b *batcher) batchSize(batch [][]byte) int { - total := 0 - for _, msg := range batch { - total += len(msg) - } - - return total -} - -func (b *batcher) flush(batch [][]byte) [][]byte { - if len(batch) > 0 { - b.sync.SendBatch(batch) - - b.mux.Lock() - b.smallestSeq = kcl.SequencePair{} - b.mux.Unlock() - } - return [][]byte{} -} - -func (b *batcher) startBatcher(msgChan <-chan msgPack, flushChan <-chan struct{}) { - batch := [][]byte{} - - for { - select { - case <-time.After(b.flushInterval): - batch = b.flush(batch) - case <-flushChan: - batch = b.flush(batch) - case pack := <-msgChan: - size := b.batchSize(batch) - if b.flushSize < size+len(pack.msg) { - batch = b.flush(batch) - } - - batch = append(batch, pack.msg) - b.updateSequenceNumbers(pack.sequencePair) - - if b.flushCount <= len(batch) || b.flushSize <= b.batchSize(batch) { - batch = b.flush(batch) - } - } - } -} diff --git a/batchconsumer/batcher/message_batcher_test.go b/batchconsumer/batcher/message_batcher_test.go deleted file mode 100644 index 73d55fa..0000000 --- a/batchconsumer/batcher/message_batcher_test.go +++ /dev/null @@ -1,220 +0,0 @@ -package batcher - -import ( - "fmt" - "math/big" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/Clever/amazon-kinesis-client-go/kcl" -) - -type batch [][]byte - -type MockSync struct { - flushChan chan struct{} - batches []batch -} - -func NewMockSync() *MockSync { - return &MockSync{ - flushChan: make(chan struct{}, 1), - batches: []batch{}, - } -} - -func (m *MockSync) SendBatch(b [][]byte) { - m.batches = append(m.batches, batch(b)) - m.flushChan <- struct{}{} -} - -func (m *MockSync) waitForFlush(timeout time.Duration) error { - select { - case <-m.flushChan: - return nil - case <-time.After(timeout): - return fmt.Errorf("timed out before flush (waited %s)", timeout.String()) - } -} - -var mockSequence = kcl.SequencePair{big.NewInt(99999), 12345} - -func TestBatchingByCount(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Hour, 2, 1024*1024) - assert.NoError(err) - - t.Log("Batcher respect count limit") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(2, len(sync.batches[0])) - assert.Equal("hihi", string(sync.batches[0][0])) - assert.Equal("heyhey", string(sync.batches[0][1])) - - t.Log("Batcher doesn't send partial batches") - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) -} - -func TestBatchingByTime(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Millisecond, 2000000, 1024*1024) - assert.NoError(err) - - t.Log("Batcher sends partial batches when time expires") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(1, len(sync.batches[0])) - assert.Equal("hihi", string(sync.batches[0][0])) - - t.Log("Batcher sends all messsages in partial batches when time expires") - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(2, len(sync.batches)) - assert.Equal(2, len(sync.batches[1])) - assert.Equal("heyhey", string(sync.batches[1][0])) - assert.Equal("yoyo", string(sync.batches[1][1])) - - t.Log("Batcher doesn't send empty batches") - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) -} - -func TestBatchingBySize(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Hour, 2000000, 8) - assert.NoError(err) - - t.Log("Large messages are sent immediately") - assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(1, len(sync.batches[0])) - assert.Equal("hellohello", string(sync.batches[0][0])) - - t.Log("Batcher tries not to exceed size limit") - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(2, len(sync.batches)) - assert.Equal(1, len(sync.batches[1])) - assert.Equal("heyhey", string(sync.batches[1][0])) - - t.Log("Batcher sends messages that didn't fit in previous batch") - assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) // At this point "hihi" is in the batch - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(3, len(sync.batches)) - assert.Equal(2, len(sync.batches[2])) - assert.Equal("hihi", string(sync.batches[2][0])) - assert.Equal("yoyo", string(sync.batches[2][1])) - - t.Log("Batcher doesn't send partial batches") - assert.NoError(batcher.AddMessage([]byte("okok"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) -} - -func TestFlushing(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Hour, 2000000, 1024*1024) - assert.NoError(err) - - t.Log("Calling flush sends pending messages") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) - - batcher.Flush() - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(1, len(sync.batches[0])) - assert.Equal("hihi", string(sync.batches[0][0])) -} - -func TestSendingEmpty(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Second, 10, 1024*1024) - assert.NoError(err) - - t.Log("An error is returned when an empty message is sent") - err = batcher.AddMessage([]byte{}, mockSequence) - assert.Error(err) - assert.Equal(err.Error(), "Empty messages can't be sent") -} - -func TestUpdatingSequence(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - b, err := New(sync, time.Second, 10, 1024*1024) - assert.NoError(err) - - batcher := b.(*batcher) - - t.Log("Initally, smallestSeq is undefined") - assert.Nil(batcher.SmallestSequencePair().Sequence) - - expected := new(big.Int) - - t.Log("After AddMessage (seq=1), smallestSeq = 1") - batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(1), 1234}) - expected.SetInt64(1) - seq := batcher.SmallestSequencePair() - assert.True(expected.Cmp(seq.Sequence) == 0) - - t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher") - batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(2), 1234}) - seq = batcher.SmallestSequencePair() - assert.True(expected.Cmp(seq.Sequence) == 0) - - t.Log("After AddMessage (seq=1), smallestSeq = 0") - batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(0), 1234}) - expected.SetInt64(0) - seq = batcher.SmallestSequencePair() - assert.True(expected.Cmp(seq.Sequence) == 0) - - t.Log("Flushing batch clears smallest sequence pair") - assert.NoError(batcher.AddMessage([]byte("cdcd"), kcl.SequencePair{big.NewInt(2), 1234})) - sync.waitForFlush(time.Minute) - assert.Nil(batcher.SmallestSequencePair().Sequence) -} diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go new file mode 100644 index 0000000..bf2b2a2 --- /dev/null +++ b/batchconsumer/batchermanager.go @@ -0,0 +1,196 @@ +package batchconsumer + +import ( + "os" + "time" + + kv "gopkg.in/Clever/kayvee-go.v6/logger" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +type tagMsgPair struct { + tag string + msg []byte + pair kcl.SequencePair +} + +type batcherManager struct { + log kv.KayveeLogger + sender Sender + chkpntManager *checkpointManager + + batchCount int + batchSize int + batchInterval time.Duration + + batchMsg chan tagMsgPair + lastIgnored chan kcl.SequencePair + lastProcessed chan kcl.SequencePair + shutdown chan struct{} +} + +func NewBatcherManager( + sender Sender, chkpntManager *checkpointManager, config Config, log kv.KayveeLogger, +) *batcherManager { + bm := &batcherManager{ + log: log, + sender: sender, + chkpntManager: chkpntManager, + + batchCount: config.BatchCount, + batchSize: config.BatchSize, + batchInterval: config.BatchInterval, + + batchMsg: make(chan tagMsgPair), + lastIgnored: make(chan kcl.SequencePair), + lastProcessed: make(chan kcl.SequencePair), + shutdown: make(chan struct{}), + } + + bm.startMessageHandler(bm.batchMsg, bm.lastIgnored, bm.lastProcessed, bm.shutdown) + + return bm +} + +func (b *batcherManager) BatchMessage(tag string, msg []byte, pair kcl.SequencePair) { + b.batchMsg <- tagMsgPair{tag, msg, pair} +} + +func (b *batcherManager) LatestIgnored(pair kcl.SequencePair) { + b.lastIgnored <- pair +} + +func (b *batcherManager) LatestProcessed(pair kcl.SequencePair) { + b.lastProcessed <- pair +} + +func (b *batcherManager) Shutdown() { + b.shutdown <- struct{}{} +} + +func (b *batcherManager) createBatcher() *batcher { + return &batcher{ + flushCount: b.batchCount, + flushSize: b.batchSize, + } +} + +func (b *batcherManager) sendBatch(batcher *batcher, tag string) { + if len(batcher.Batch) <= 0 { + return + } + + err := b.sender.SendBatch(batcher.Batch, tag) + switch e := err.(type) { + case nil: // Do nothing + case PartialSendBatchError: + b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) + for _, line := range e.FailedMessages { + b.log.ErrorD("failed-log", kv.M{"log": line}) + } + case CatastrophicSendBatchError: + b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + os.Exit(1) + default: + b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + os.Exit(1) + } + + batcher.Clear() +} + +func (b *batcherManager) sendCheckpoint( + tag string, lastIgnoredPair kcl.SequencePair, batchers map[string]*batcher, +) { + smallest := lastIgnoredPair + + for name, batcher := range batchers { + if tag == name { + continue + } + + if len(batcher.Batch) <= 0 { + continue + } + + // Check for empty because it's possible that no messages have been ignored + if smallest.IsEmpty() || batcher.SmallestSeq.IsLessThan(smallest) { + smallest = batcher.SmallestSeq + } + } + + if !smallest.IsEmpty() { + b.chkpntManager.Checkpoint(smallest) + } +} + +// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a +// go routine to avoid racey conditions. +func (b *batcherManager) startMessageHandler( + batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair, + shutdown <-chan struct{}, +) { + go func() { + var lastProcessedPair kcl.SequencePair + var lastIgnoredPair kcl.SequencePair + batchers := map[string]*batcher{} + + for { + for tag, batcher := range batchers { // Flush batcher that hasn't been updated recently + if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) { + b.sendBatch(batcher, tag) + b.sendCheckpoint(tag, lastIgnoredPair, batchers) + batcher.Clear() + } + } + + select { + case <-time.NewTimer(time.Second).C: + // Timer is janky way to ensure the code above is ran at regular intervals + // Can't put above code under this case because it's very possible that this + // timer is always pre-empted by other channels + case tmp := <-batchMsg: + batcher, ok := batchers[tmp.tag] + if !ok { + batcher = b.createBatcher() + batchers[tmp.tag] = batcher + } + + err := batcher.AddMessage(tmp.msg, tmp.pair) + if err == ErrBatchFull { + b.sendBatch(batcher, tmp.tag) + b.sendCheckpoint(tmp.tag, lastIgnoredPair, batchers) + + batcher.AddMessage(tmp.msg, tmp.pair) + } else if err != nil { + b.log.ErrorD("add-message", kv.M{ + "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, + }) + } + case pair := <-lastIgnored: + lastIgnoredPair = pair + + isPendingMessages := false + for _, batcher := range batchers { + if len(batcher.Batch) > 0 { + isPendingMessages = true + break + } + } + + if !isPendingMessages { + b.chkpntManager.Checkpoint(lastIgnoredPair) + } + case pair := <-lastProcessed: + lastProcessedPair = pair + case <-shutdown: + for tag, batcher := range batchers { + b.sendBatch(batcher, tag) + } + b.chkpntManager.Checkpoint(lastProcessedPair) + b.chkpntManager.Shutdown() + } + } + }() +} diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go new file mode 100644 index 0000000..9a877fd --- /dev/null +++ b/batchconsumer/checkpointmanager.go @@ -0,0 +1,89 @@ +package batchconsumer + +import ( + "time" + + kv "gopkg.in/Clever/kayvee-go.v6/logger" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +type checkpointManager struct { + log kv.KayveeLogger + + checkpointRetries int + checkpointFreq time.Duration + + checkpoint chan kcl.SequencePair + shutdown chan struct{} +} + +func NewCheckpointManager( + checkpointer kcl.Checkpointer, config Config, log kv.KayveeLogger, +) *checkpointManager { + cm := &checkpointManager{ + log: log, + + checkpointRetries: config.CheckpointRetries, + checkpointFreq: config.CheckpointFreq, + + checkpoint: make(chan kcl.SequencePair), + shutdown: make(chan struct{}), + } + + cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown) + + return cm +} + +func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) { + cm.checkpoint <- pair +} + +func (cm *checkpointManager) Shutdown() { + cm.shutdown <- struct{}{} +} + +func (cm *checkpointManager) startCheckpointHandler( + checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair, shutdown <-chan struct{}, +) { + go func() { + lastCheckpoint := time.Now() + + for { + pair := kcl.SequencePair{} + isShuttingDown := false + + select { + case pair = <-checkpoint: + case <-shutdown: + isShuttingDown = true + } + + // This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq. + // The latest pair number is always used. + for !isShuttingDown && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq { + select { + case pair = <-checkpoint: // Keep updating checkpoint pair while waiting + case <-shutdown: + isShuttingDown = true + case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C: + } + } + + if !pair.IsEmpty() { + err := checkpointer.Checkpoint(pair, cm.checkpointRetries) + if err != nil { + cm.log.ErrorD("checkpoint-err", kv.M{"msg": err.Error()}) + } else { + lastCheckpoint = time.Now() + } + } + + if isShuttingDown { + checkpointer.Shutdown() + return + } + } + }() +} diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 35f4ddc..2b6f023 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -18,6 +18,7 @@ type Config struct { // LogFile where consumer errors and failed log lines are saved LogFile string + // BatchInterval the upper bound on how often SendBatch is called with accumulated messages BatchInterval time.Duration // BatchCount is the number of messages that triggers a SendBatch call diff --git a/batchconsumer/sync.go b/batchconsumer/sync.go deleted file mode 100644 index f50a703..0000000 --- a/batchconsumer/sync.go +++ /dev/null @@ -1,10 +0,0 @@ -package batchconsumer - -type batcherSync struct { - tag string - writer *batchedWriter -} - -func (b *batcherSync) SendBatch(batch [][]byte) { - b.writer.SendBatch(batch, b.tag) -} diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index bb8e2e6..519eff7 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -5,23 +5,14 @@ import ( "encoding/base64" "fmt" "math/big" - "os" - "time" "golang.org/x/time/rate" kv "gopkg.in/Clever/kayvee-go.v6/logger" - "github.com/Clever/amazon-kinesis-client-go/batchconsumer/batcher" "github.com/Clever/amazon-kinesis-client-go/kcl" "github.com/Clever/amazon-kinesis-client-go/splitter" ) -type tagMsgPair struct { - tag string - msg []byte - pair kcl.SequencePair -} - type batchedWriter struct { config Config sender Sender @@ -29,12 +20,8 @@ type batchedWriter struct { shardID string - checkpointMsg chan kcl.SequencePair - checkpointShutdown chan struct{} - checkpointTag chan string - lastIgnoredPair chan kcl.SequencePair - batchMsg chan tagMsgPair - shutdown chan struct{} + chkpntManager *checkpointManager + batcherManager *batcherManager // Limits the number of records read from the stream rateLimiter *rate.Limiter @@ -54,164 +41,13 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID - b.checkpointMsg = make(chan kcl.SequencePair) - b.checkpointShutdown = make(chan struct{}) - b.startCheckpointListener(checkpointer, b.checkpointMsg, b.checkpointShutdown) - b.checkpointTag = make(chan string, 100) // Buffered to workaround - b.batchMsg = make(chan tagMsgPair) - b.shutdown = make(chan struct{}) - b.lastIgnoredPair = make(chan kcl.SequencePair) - b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastIgnoredPair, b.shutdown) + b.chkpntManager = NewCheckpointManager(checkpointer, b.config, b.log) + b.batcherManager = NewBatcherManager(b.sender, b.chkpntManager, b.config, b.log) return nil } -func (b *batchedWriter) startCheckpointListener( - checkpointer kcl.Checkpointer, checkpointMsg <-chan kcl.SequencePair, - shutdown <-chan struct{}, -) { - go func() { - lastCheckpoint := time.Now() - - for { - pair := kcl.SequencePair{} - isShuttingDown := false - - select { - case pair = <-checkpointMsg: - case <-shutdown: - isShuttingDown = true - } - - // This is a write throttle to ensure we don't checkpoint faster than - // b.config.CheckpointFreq. The latest pair number is always used. - for !isShuttingDown && time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { - select { - case pair = <-checkpointMsg: // Keep updating checkpoint pair while waiting - case <-shutdown: - isShuttingDown = true - case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C: - } - } - - if !pair.IsEmpty() { - err := checkpointer.Checkpoint(pair, b.config.CheckpointRetries) - if err != nil { - b.log.ErrorD("checkpoint-err", kv.M{"msg": err.Error(), "shard-id": b.shardID}) - } else { - lastCheckpoint = time.Now() - } - } - - if isShuttingDown { - checkpointer.Shutdown() - return - } - } - }() -} - -func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { - sync := &batcherSync{ - tag: tag, - writer: b, - } - batch, err := batcher.New(sync, b.config.BatchInterval, b.config.BatchCount, b.config.BatchSize) - if err != nil { - b.log.ErrorD("create-batcher", kv.M{"msg": err.Error(), "tag": tag}) - } - - return batch -} - -// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a -// go routine to avoid racey conditions. -func (b *batchedWriter) startMessageHandler( - batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastIgnored <-chan kcl.SequencePair, - shutdown <-chan struct{}, -) { - getBatcher := make(chan string) - rtnBatcher := make(chan batcher.Batcher) - shutdownAdder := make(chan struct{}) - - go func() { - for { - select { - case tmp := <-batchMsg: - getBatcher <- tmp.tag - batcher := <-rtnBatcher - err := batcher.AddMessage(tmp.msg, tmp.pair) - if err != nil { - b.log.ErrorD("add-message", kv.M{ - "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, - }) - } - case <-shutdownAdder: - } - } - }() - - go func() { - var lastIgnoredPair kcl.SequencePair - batchers := map[string]batcher.Batcher{} - areBatchersEmpty := true - - for { - select { - case tag := <-getBatcher: - batcher, ok := batchers[tag] - if !ok { - batcher = b.createBatcher(tag) - batchers[tag] = batcher - } - - areBatchersEmpty = false - rtnBatcher <- batcher - case tag := <-checkpointTag: - smallest := lastIgnoredPair - isAllEmpty := true - - for name, batch := range batchers { - if tag == name { - continue - } - - pair := batch.SmallestSequencePair() - if pair.IsEmpty() { // Occurs when batch has no items - continue - } - - // Check for empty because it's possible that no messages have been ignored - if smallest.IsEmpty() || pair.IsLessThan(smallest) { - smallest = pair - } - - isAllEmpty = false - } - - if !smallest.IsEmpty() { - b.checkpointMsg <- smallest - } - areBatchersEmpty = isAllEmpty - case pair := <-lastIgnored: - if areBatchersEmpty && !pair.IsEmpty() { - b.checkpointMsg <- pair - } - lastIgnoredPair = pair - case <-shutdown: - for _, batch := range batchers { - batch.Flush() - } - b.checkpointMsg <- b.lastProcessedSeq - b.checkpointShutdown <- struct{}{} - - areBatchersEmpty = true - } - } - }() -} - func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { // We handle two types of records: // - records emitted from CWLogs Subscription @@ -278,47 +114,30 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { // it's been sent. When batches are sent, conceptually we first find the smallest // sequence number amount all the batch (let's call it A). We then checkpoint at // the A-1 sequence number. - b.batchMsg <- tagMsgPair{tag, msg, prevPair} + b.batcherManager.BatchMessage(tag, msg, prevPair) wasPairIgnored = false } } prevPair = pair if wasPairIgnored { - b.lastIgnoredPair <- pair + b.batcherManager.LatestIgnored(pair) } + b.batcherManager.LatestProcessed(pair) } b.lastProcessedSeq = pair return nil } -func (b *batchedWriter) SendBatch(batch [][]byte, tag string) { - err := b.sender.SendBatch(batch, tag) - switch e := err.(type) { - case nil: // Do nothing - case PartialSendBatchError: - b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) - for _, line := range e.FailedMessages { - b.log.ErrorD("failed-log", kv.M{"log": line}) - } - case CatastrophicSendBatchError: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) - os.Exit(1) - default: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) - os.Exit(1) - } - - b.checkpointTag <- tag -} - func (b *batchedWriter) Shutdown(reason string) error { if reason == "TERMINATE" { b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) } else { b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } - b.shutdown <- struct{}{} + + b.batcherManager.Shutdown() + return nil } diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index d87d1ce..0f7e189 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -192,8 +192,6 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { }) assert.NoError(err) - time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once - err = wrt.Shutdown("TERMINATE") assert.NoError(err) @@ -270,8 +268,6 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { }) assert.NoError(err) - time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once - err = wrt.Shutdown("TERMINATE") assert.NoError(err) @@ -341,7 +337,6 @@ func TestStaggeredCheckpionting(t *testing.T) { assert.NoError(err) mocksender.Shutdown() - mockcheckpointer.Shutdown() // Test to make sure writer doesn't prematurely checkpoint messages // Checkpoints 5,6,7,8 will never be submitted because the 3rd "tag1" is in a batch From c5f75d655467cfc09dd83926ef9072e002f307a4 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Sun, 6 Aug 2017 01:52:11 +0000 Subject: [PATCH 10/28] Fixed another race-condition. Ensure line is completely processed before allowing a checkpoint. --- kcl/kcl.go | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/kcl/kcl.go b/kcl/kcl.go index 76f6c13..a6f4776 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -281,11 +281,13 @@ func (kclp *KCLProcess) startLineProcessor( select { case <-next: line, err := kclp.ioHandler.readLine() - if err != nil { - outErr <- err - } else { - out <- line + if err == nil { + if line == "" { + err = fmt.Errorf("Empty read line recieved") + } + err = kclp.handleLine(line) } + outErr <- err case pair := <-checkpoint: err := kclp.processCheckpoint(pair) checkpointErr <- err @@ -297,20 +299,7 @@ func (kclp *KCLProcess) startLineProcessor( func (kclp *KCLProcess) processNextLine() error { kclp.next <- struct{}{} // We're ready for a new line - var err error - var line string - - select { - case err = <-kclp.outErr: - case line = <-kclp.out: - if line == "" { - err = fmt.Errorf("Empty read line recieved") - } else { - err = kclp.handleLine(line) - } - } - - return err + return <-kclp.outErr } func (kclp *KCLProcess) Run() { From c814742afa84822fb62e58ffd1859dd987ace825 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Sun, 6 Aug 2017 02:59:28 +0000 Subject: [PATCH 11/28] Another refactor --- kcl/kcl.go | 63 ++++++++++++++++++------------------------------------ 1 file changed, 21 insertions(+), 42 deletions(-) diff --git a/kcl/kcl.go b/kcl/kcl.go index a6f4776..86a07e4 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "sync" "time" ) @@ -147,23 +148,18 @@ func New( ioHandler: i, recordProcessor: recordProcessor, - next: make(chan struct{}), - out: make(chan string), - outErr: make(chan error), - checkpoint: make(chan SequencePair), checkpointErr: make(chan error), } } type KCLProcess struct { + ckpmux sync.Mutex + readmux sync.Mutex + ioHandler ioHandler recordProcessor RecordProcessor - next chan struct{} - out chan string - outErr chan error - checkpoint chan SequencePair checkpointErr chan error } @@ -192,6 +188,9 @@ func (kclp *KCLProcess) performAction(a interface{}) (string, error) { } func (kclp *KCLProcess) handleLine(line string) error { + kclp.readmux.Lock() + defer kclp.readmux.Unlock() + action, err := kclp.ioHandler.loadAction(line) if err != nil { return err @@ -208,8 +207,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { sleepDuration := 5 * time.Second for n := 0; n <= retryCount; n++ { - kclp.checkpoint <- pair - err := <-kclp.checkpointErr + err := kclp.processCheckpoint(pair) if err == nil { return nil } @@ -242,6 +240,9 @@ func (kclp *KCLProcess) Shutdown() { } func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { + kclp.ckpmux.Lock() + defer kclp.ckpmux.Unlock() + var seq *string var subSeq *int if !pair.IsEmpty() { // an empty pair is a signal to shutdown @@ -272,44 +273,22 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { return nil } -func (kclp *KCLProcess) startLineProcessor( - next chan struct{}, out chan string, outErr chan error, - checkpoint chan SequencePair, checkpointErr chan error, -) { - go func() { - for { - select { - case <-next: - line, err := kclp.ioHandler.readLine() - if err == nil { - if line == "" { - err = fmt.Errorf("Empty read line recieved") - } - err = kclp.handleLine(line) - } - outErr <- err - case pair := <-checkpoint: - err := kclp.processCheckpoint(pair) - checkpointErr <- err - } - } - }() -} - -func (kclp *KCLProcess) processNextLine() error { - kclp.next <- struct{}{} // We're ready for a new line - - return <-kclp.outErr -} - func (kclp *KCLProcess) Run() { - kclp.startLineProcessor(kclp.next, kclp.out, kclp.outErr, kclp.checkpoint, kclp.checkpointErr) for { - err := kclp.processNextLine() + line, err := kclp.ioHandler.readLine() if err == io.EOF { kclp.ioHandler.writeError("IO stream closed") return } else if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR Read line: %+#v", err)) + return + } else if line == "" { + kclp.ioHandler.writeError("Empty read line recieved") + return + } + + err = kclp.handleLine(line) + if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return } From b19c9297d8332d4b420739c46ad96e05f1b276cd Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Sun, 6 Aug 2017 03:46:58 +0000 Subject: [PATCH 12/28] Adjusted some locks --- kcl/kcl.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/kcl/kcl.go b/kcl/kcl.go index 86a07e4..c470637 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -188,9 +188,6 @@ func (kclp *KCLProcess) performAction(a interface{}) (string, error) { } func (kclp *KCLProcess) handleLine(line string) error { - kclp.readmux.Lock() - defer kclp.readmux.Unlock() - action, err := kclp.ioHandler.loadAction(line) if err != nil { return err @@ -204,9 +201,12 @@ func (kclp *KCLProcess) handleLine(line string) error { } func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { + kclp.ckpmux.Lock() + defer kclp.ckpmux.Unlock() sleepDuration := 5 * time.Second for n := 0; n <= retryCount; n++ { + fmt.Printf("Trying to checkpoint %d\n", n) err := kclp.processCheckpoint(pair) if err == nil { return nil @@ -240,8 +240,8 @@ func (kclp *KCLProcess) Shutdown() { } func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { - kclp.ckpmux.Lock() - defer kclp.ckpmux.Unlock() + kclp.readmux.Lock() + defer kclp.readmux.Unlock() var seq *string var subSeq *int @@ -270,11 +270,13 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { if action.Error != nil && *action.Error != "" { return CheckpointError{e: *action.Error} } + fmt.Println("Successful checkpoint") return nil } func (kclp *KCLProcess) Run() { for { + kclp.readmux.Lock() line, err := kclp.ioHandler.readLine() if err == io.EOF { kclp.ioHandler.writeError("IO stream closed") @@ -292,5 +294,6 @@ func (kclp *KCLProcess) Run() { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return } + kclp.readmux.Unlock() } } From 734b46274f5ad57378ed84ff41d802aaa7c345bb Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Sun, 6 Aug 2017 05:02:55 +0000 Subject: [PATCH 13/28] Better coordinated checkpoints and readlines. Checkpoint function now returns all errors. --- batchconsumer/checkpointmanager.go | 14 ++---- batchconsumer/consumer.go | 10 ----- kcl/kcl.go | 71 +++++++++++++++++++----------- 3 files changed, 50 insertions(+), 45 deletions(-) diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index 9a877fd..eb755b6 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -11,8 +11,7 @@ import ( type checkpointManager struct { log kv.KayveeLogger - checkpointRetries int - checkpointFreq time.Duration + checkpointFreq time.Duration checkpoint chan kcl.SequencePair shutdown chan struct{} @@ -24,8 +23,7 @@ func NewCheckpointManager( cm := &checkpointManager{ log: log, - checkpointRetries: config.CheckpointRetries, - checkpointFreq: config.CheckpointFreq, + checkpointFreq: config.CheckpointFreq, checkpoint: make(chan kcl.SequencePair), shutdown: make(chan struct{}), @@ -72,12 +70,8 @@ func (cm *checkpointManager) startCheckpointHandler( } if !pair.IsEmpty() { - err := checkpointer.Checkpoint(pair, cm.checkpointRetries) - if err != nil { - cm.log.ErrorD("checkpoint-err", kv.M{"msg": err.Error()}) - } else { - lastCheckpoint = time.Now() - } + checkpointer.Checkpoint(pair) + lastCheckpoint = time.Now() } if isShuttingDown { diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 2b6f023..9b16e0b 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -33,10 +33,6 @@ type Config struct { // CheckpointFreq the frequency in which a checkpoint is saved CheckpointFreq time.Duration - // CheckpointRetries the number of times the consumer will try to save a checkpoint - CheckpointRetries int - // CheckpointRetrySleep the amount of time between checkpoint save attempts - CheckpointRetrySleep time.Duration } // BatchConsumer is responsible for marshalling @@ -75,12 +71,6 @@ func withDefaults(config Config) Config { if config.CheckpointFreq == 0 { config.CheckpointFreq = 60 * time.Second } - if config.CheckpointRetries == 0 { - config.CheckpointRetries = 5 - } - if config.CheckpointRetrySleep == 0 { - config.CheckpointRetrySleep = 5 * time.Second - } return config } diff --git a/kcl/kcl.go b/kcl/kcl.go index c470637..5dbc3a7 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -17,7 +17,7 @@ type RecordProcessor interface { } type Checkpointer interface { - Checkpoint(pair SequencePair, retryCount int) error + Checkpoint(pair SequencePair) Shutdown() } @@ -148,23 +148,23 @@ func New( ioHandler: i, recordProcessor: recordProcessor, - checkpoint: make(chan SequencePair), - checkpointErr: make(chan error), + isShuttingDown: false, + nextCheckpointPair: SequencePair{}, } } type KCLProcess struct { - ckpmux sync.Mutex - readmux sync.Mutex + ckpmux sync.Mutex ioHandler ioHandler recordProcessor RecordProcessor - checkpoint chan SequencePair - checkpointErr chan error + isShuttingDown bool + nextCheckpointPair SequencePair } func (kclp *KCLProcess) reportDone(responseFor string) error { + fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing report done line") return kclp.ioHandler.writeAction(struct { Action string `json:"action"` ResponseFor string `json:"responseFor"` @@ -187,22 +187,29 @@ func (kclp *KCLProcess) performAction(a interface{}) (string, error) { } } -func (kclp *KCLProcess) handleLine(line string) error { +func (kclp *KCLProcess) handleLine(line string) (string, error) { action, err := kclp.ioHandler.loadAction(line) if err != nil { - return err + return "", err } - responseFor, err := kclp.performAction(action) - if err != nil { - return err - } - return kclp.reportDone(responseFor) + return kclp.performAction(action) } -func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { +func (kclp *KCLProcess) Checkpoint(pair SequencePair) { kclp.ckpmux.Lock() defer kclp.ckpmux.Unlock() + + if kclp.nextCheckpointPair.IsEmpty() || kclp.nextCheckpointPair.IsLessThan(pair) { + kclp.nextCheckpointPair = pair + } +} + +func (kclp *KCLProcess) Shutdown() { + kclp.isShuttingDown = true +} + +func (kclp *KCLProcess) sendCheckpoint(pair SequencePair, retryCount int) error { sleepDuration := 5 * time.Second for n := 0; n <= retryCount; n++ { @@ -223,6 +230,8 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { default: fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) } + } else { + return err } if n == retryCount { @@ -235,14 +244,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { return nil } -func (kclp *KCLProcess) Shutdown() { - kclp.Checkpoint(SequencePair{}, 5) -} - func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { - kclp.readmux.Lock() - defer kclp.readmux.Unlock() - var seq *string var subSeq *int if !pair.IsEmpty() { // an empty pair is a signal to shutdown @@ -250,11 +252,13 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { seq = &tmp subSeq = &pair.SubSequence } + fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing checkpoint") kclp.ioHandler.writeAction(ActionCheckpoint{ Action: "checkpoint", SequenceNumber: seq, SubSequenceNumber: subSeq, }) + fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading checkpoint line") line, err := kclp.ioHandler.readLine() if err != nil { return err @@ -276,7 +280,7 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { func (kclp *KCLProcess) Run() { for { - kclp.readmux.Lock() + fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading process line") line, err := kclp.ioHandler.readLine() if err == io.EOF { kclp.ioHandler.writeError("IO stream closed") @@ -289,11 +293,28 @@ func (kclp *KCLProcess) Run() { return } - err = kclp.handleLine(line) + action, err := kclp.handleLine(line) if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return } - kclp.readmux.Unlock() + + if !kclp.nextCheckpointPair.IsEmpty() { + err := kclp.sendCheckpoint(kclp.nextCheckpointPair, 5) + if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err)) + } + kclp.nextCheckpointPair = SequencePair{} + } + + err = kclp.reportDone(action) + if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR report done: %s, %+#v", action, err)) + return + } + + if kclp.isShuttingDown { + kclp.sendCheckpoint(SequencePair{}, 5) // Empty SequencePair is signal to shutdown + } } } From 1bff01ff4fa2006b9b691eca3f0c9cd15adb0c6e Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Sun, 6 Aug 2017 05:44:51 +0000 Subject: [PATCH 14/28] Removed unused params and properties --- cmd/consumer/main.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 8e51ee9..6f2f51a 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -10,17 +10,15 @@ import ( ) type sampleRecordProcessor struct { - checkpointer kcl.Checkpointer - checkpointRetries int - checkpointFreq time.Duration - largestPair kcl.SequencePair - lastCheckpoint time.Time + checkpointer kcl.Checkpointer + checkpointFreq time.Duration + largestPair kcl.SequencePair + lastCheckpoint time.Time } func newSampleRecordProcessor() *sampleRecordProcessor { return &sampleRecordProcessor{ - checkpointRetries: 5, - checkpointFreq: 60 * time.Second, + checkpointFreq: 60 * time.Second, } } @@ -47,7 +45,7 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { } } if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq { - srp.checkpointer.Checkpoint(srp.largestPair, srp.checkpointRetries) + srp.checkpointer.Checkpoint(srp.largestPair) srp.lastCheckpoint = time.Now() } return nil From 4809cdb4e6b34623aa4a1d4fd51bfdb2663928a2 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Sun, 6 Aug 2017 10:58:46 +0000 Subject: [PATCH 15/28] Another refactor to simply and likely speed up code. New code allows checkpoint and readline messages to be interleaved. --- kcl/kcl.go | 183 +++++++++++++++++++++++------------------------------ 1 file changed, 78 insertions(+), 105 deletions(-) diff --git a/kcl/kcl.go b/kcl/kcl.go index 5dbc3a7..399a7c2 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -21,22 +21,12 @@ type Checkpointer interface { Shutdown() } -type CheckpointError struct { - e string -} - -func (ce CheckpointError) Error() string { - return ce.e -} - type ioHandler struct { inputFile io.Reader outputFile io.Writer errorFile io.Writer } -//func newIOHandler(inputFile io.Reader, outputFile io.Writer, errorFile io.) - func (i ioHandler) writeLine(line string) { fmt.Fprintf(i.outputFile, "\n%s\n", line) } @@ -163,39 +153,6 @@ type KCLProcess struct { nextCheckpointPair SequencePair } -func (kclp *KCLProcess) reportDone(responseFor string) error { - fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing report done line") - return kclp.ioHandler.writeAction(struct { - Action string `json:"action"` - ResponseFor string `json:"responseFor"` - }{ - Action: "status", - ResponseFor: responseFor, - }) -} - -func (kclp *KCLProcess) performAction(a interface{}) (string, error) { - switch action := a.(type) { - case ActionInitialize: - return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp) - case ActionProcessRecords: - return action.Action, kclp.recordProcessor.ProcessRecords(action.Records) - case ActionShutdown: - return action.Action, kclp.recordProcessor.Shutdown(action.Reason) - default: - return "", fmt.Errorf("unknown action to dispatch: %+#v", action) - } -} - -func (kclp *KCLProcess) handleLine(line string) (string, error) { - action, err := kclp.ioHandler.loadAction(line) - if err != nil { - return "", err - } - - return kclp.performAction(action) -} - func (kclp *KCLProcess) Checkpoint(pair SequencePair) { kclp.ckpmux.Lock() defer kclp.ckpmux.Unlock() @@ -209,78 +166,94 @@ func (kclp *KCLProcess) Shutdown() { kclp.isShuttingDown = true } -func (kclp *KCLProcess) sendCheckpoint(pair SequencePair, retryCount int) error { - sleepDuration := 5 * time.Second +func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { + if action.Error == nil { // Successful checkpoint + return nil + } - for n := 0; n <= retryCount; n++ { - fmt.Printf("Trying to checkpoint %d\n", n) - err := kclp.processCheckpoint(pair) - if err == nil { - return nil - } + msg := *action.Error + switch msg { + case "ShutdownException": + return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") + case "ThrottlingException": + sleep := 5 * time.Second + fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s", sleep) + time.Sleep(sleep) + case "InvalidStateException": + fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing") + default: + fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", msg) + } - if cperr, ok := err.(CheckpointError); ok { - switch cperr.Error() { - case "ShutdownException": - return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") - case "ThrottlingException": - fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s\n", sleepDuration) - case "InvalidStateException": - fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing\n") - default: - fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) - } - } else { - return err - } + seq := action.SequenceNumber + subSeq := action.SubSequenceNumber - if n == retryCount { - return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount) - } + kclp.ckpmux.Lock() + if !kclp.nextCheckpointPair.IsEmpty() { + tmp := kclp.nextCheckpointPair.Sequence.String() + seq = &tmp + subSeq = &kclp.nextCheckpointPair.SubSequence + } + kclp.ckpmux.Unlock() - time.Sleep(sleepDuration) + if seq != nil && subSeq != nil { + return kclp.sendCheckpoint(seq, subSeq) } return nil } -func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { - var seq *string - var subSeq *int - if !pair.IsEmpty() { // an empty pair is a signal to shutdown - tmp := pair.Sequence.String() - seq = &tmp - subSeq = &pair.SubSequence - } - fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing checkpoint") - kclp.ioHandler.writeAction(ActionCheckpoint{ +func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error { + return kclp.ioHandler.writeAction(ActionCheckpoint{ Action: "checkpoint", SequenceNumber: seq, SubSequenceNumber: subSeq, }) - fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading checkpoint line") - line, err := kclp.ioHandler.readLine() +} + +func (kclp *KCLProcess) reportDone(responseFor string) error { + return kclp.ioHandler.writeAction(struct { + Action string `json:"action"` + ResponseFor string `json:"responseFor"` + }{ + Action: "status", + ResponseFor: responseFor, + }) +} + +func (kclp *KCLProcess) handleLine(line string) error { + action, err := kclp.ioHandler.loadAction(line) if err != nil { return err } - actionI, err := kclp.ioHandler.loadAction(line) - if err != nil { - return err + + switch action := action.(type) { + case ActionCheckpoint: + err = kclp.handleCheckpointAction(action) + case ActionInitialize: + err = kclp.recordProcessor.Initialize(action.ShardID, kclp) + if err == nil { + err = kclp.reportDone(action.Action) + } + case ActionProcessRecords: + err = kclp.recordProcessor.ProcessRecords(action.Records) + if err == nil { + err = kclp.reportDone(action.Action) + } + case ActionShutdown: + err = kclp.recordProcessor.Shutdown(action.Reason) + if err == nil { + err = kclp.reportDone(action.Action) + } + default: + err = fmt.Errorf("unknown action to dispatch: %+#v", action) } - action, ok := actionI.(ActionCheckpoint) - if !ok { - return fmt.Errorf("expected checkpoint response, got '%s'", line) - } - if action.Error != nil && *action.Error != "" { - return CheckpointError{e: *action.Error} - } - fmt.Println("Successful checkpoint") - return nil + + return err } func (kclp *KCLProcess) Run() { for { - fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading process line") line, err := kclp.ioHandler.readLine() if err == io.EOF { kclp.ioHandler.writeError("IO stream closed") @@ -290,31 +263,31 @@ func (kclp *KCLProcess) Run() { return } else if line == "" { kclp.ioHandler.writeError("Empty read line recieved") - return + continue } - action, err := kclp.handleLine(line) + err = kclp.handleLine(line) if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return } + kclp.ckpmux.Lock() if !kclp.nextCheckpointPair.IsEmpty() { - err := kclp.sendCheckpoint(kclp.nextCheckpointPair, 5) + seq := kclp.nextCheckpointPair.Sequence.String() + subSeq := kclp.nextCheckpointPair.SubSequence + + err := kclp.sendCheckpoint(&seq, &subSeq) if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err)) + } else { + kclp.nextCheckpointPair = SequencePair{} } - kclp.nextCheckpointPair = SequencePair{} - } - - err = kclp.reportDone(action) - if err != nil { - kclp.ioHandler.writeError(fmt.Sprintf("ERR report done: %s, %+#v", action, err)) - return } + kclp.ckpmux.Unlock() if kclp.isShuttingDown { - kclp.sendCheckpoint(SequencePair{}, 5) // Empty SequencePair is signal to shutdown + kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown } } } From 5c373fa7d8cb7d7c1e80c8033b27dcffd26ba847 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 7 Aug 2017 00:54:29 +0000 Subject: [PATCH 16/28] Fixed unit tests --- batchconsumer/writer_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 0f7e189..d62c467 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -124,9 +124,8 @@ func (m *mockCheckpointer) wait() error { func (m *mockCheckpointer) Shutdown() { m.shutdown <- struct{}{} } -func (m *mockCheckpointer) Checkpoint(pair kcl.SequencePair, retry int) error { +func (m *mockCheckpointer) Checkpoint(pair kcl.SequencePair) { m.checkpoint <- pair.Sequence.String() - return nil } func encode(str string) string { From eb230b94f7c7f786f74affc6eefcb2cf09971509 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 7 Aug 2017 03:05:41 +0000 Subject: [PATCH 17/28] Added stats to base kinesis client --- batchconsumer/batchermanager.go | 5 +++ batchconsumer/checkpointmanager.go | 2 ++ batchconsumer/stats/stats.go | 50 ++++++++++++++++++++++++++++++ batchconsumer/writer.go | 6 ++++ 4 files changed, 63 insertions(+) create mode 100644 batchconsumer/stats/stats.go diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index bf2b2a2..22ca8cd 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -6,6 +6,7 @@ import ( kv "gopkg.in/Clever/kayvee-go.v6/logger" + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" ) @@ -89,6 +90,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) { for _, line := range e.FailedMessages { b.log.ErrorD("failed-log", kv.M{"log": line}) } + stats.Counter("batch-log-failures", len(e.FailedMessages)) case CatastrophicSendBatchError: b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) @@ -98,6 +100,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) { } batcher.Clear() + stats.Counter("batches-sent", 1) } func (b *batcherManager) sendCheckpoint( @@ -155,6 +158,7 @@ func (b *batcherManager) startMessageHandler( if !ok { batcher = b.createBatcher() batchers[tmp.tag] = batcher + stats.Gauge("tag-count", len(batchers)) } err := batcher.AddMessage(tmp.msg, tmp.pair) @@ -168,6 +172,7 @@ func (b *batcherManager) startMessageHandler( "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, }) } + stats.Counter("msg-batched", 1) case pair := <-lastIgnored: lastIgnoredPair = pair diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index eb755b6..ec8a9dd 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -5,6 +5,7 @@ import ( kv "gopkg.in/Clever/kayvee-go.v6/logger" + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" ) @@ -72,6 +73,7 @@ func (cm *checkpointManager) startCheckpointHandler( if !pair.IsEmpty() { checkpointer.Checkpoint(pair) lastCheckpoint = time.Now() + stats.Counter("checkpoints-sent", 1) } if isShuttingDown { diff --git a/batchconsumer/stats/stats.go b/batchconsumer/stats/stats.go new file mode 100644 index 0000000..75eaaf9 --- /dev/null +++ b/batchconsumer/stats/stats.go @@ -0,0 +1,50 @@ +package stats + +import ( + "time" + + "gopkg.in/Clever/kayvee-go.v6/logger" +) + +var log = logger.New("amazon-kinesis-client-go") + +type datum struct { + name string + value int + category string +} + +var queue = make(chan datum, 1000) + +func init() { + data := map[string]int{} + tick := time.Tick(time.Minute) + go func() { + for { + select { + case d := <-queue: + if d.category == "counter" { + data[d.name] = data[d.name] + d.value + } else if d.category == "gauge" { + data[d.name] = d.value + } else { + log.ErrorD("unknow-stat-category", logger.M{"category": d.category}) + } + case <-tick: + tmp := logger.M{} + for k, v := range data { + tmp[k] = v + } + log.InfoD("stats", tmp) + } + } + }() +} + +func Counter(name string, val int) { + queue <- datum{name, val, "counter"} +} + +func Gauge(name string, val int) { + queue <- datum{name, val, "gauge"} +} diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 519eff7..8035b9f 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -9,6 +9,7 @@ import ( "golang.org/x/time/rate" kv "gopkg.in/Clever/kayvee-go.v6/logger" + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" "github.com/Clever/amazon-kinesis-client-go/splitter" ) @@ -95,17 +96,20 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { if err == ErrMessageIgnored { continue // Skip message } else if err != nil { + stats.Counter("unknown-error", 1) b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) continue // Don't stop processing messages because of one bad message } if len(tags) == 0 { + stats.Counter("no-tags", 1) b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg)) } for _, tag := range tags { if tag == "" { + stats.Counter("blank-tag", 1) b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg)) } @@ -124,6 +128,8 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { b.batcherManager.LatestIgnored(pair) } b.batcherManager.LatestProcessed(pair) + + stats.Counter("processed-messages", len(messages)) } b.lastProcessedSeq = pair From de04a277991bba00cf3a54e156b848db0ecee133 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 7 Aug 2017 05:58:15 +0000 Subject: [PATCH 18/28] Return NonKayveeError if log contians invalid or empty json --- decode/decode.go | 8 +++++++- decode/decode_test.go | 9 +++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/decode/decode.go b/decode/decode.go index 0a89db6..47f08cc 100644 --- a/decode/decode.go +++ b/decode/decode.go @@ -83,8 +83,14 @@ func FieldsFromKayvee(line string) (map[string]interface{}, error) { possibleJSON := line[firstIdx : lastIdx+1] var fields map[string]interface{} if err := json.Unmarshal([]byte(possibleJSON), &fields); err != nil { - return map[string]interface{}{}, err + return map[string]interface{}{}, &NonKayveeError{} } + + if len(fields) == 0 { // Some logs superfluous "{}" in them. They're not kayvee. + return map[string]interface{}{}, &NonKayveeError{} + } + // TODO: consider also filter if they have source and title + for k, v := range fields { if !stringInSlice(k, reservedFields) { m[k] = v diff --git a/decode/decode_test.go b/decode/decode_test.go index 4ec3ab6..22877a5 100644 --- a/decode/decode_test.go +++ b/decode/decode_test.go @@ -1,7 +1,6 @@ package decode import ( - "encoding/json" "fmt" "testing" "time" @@ -75,7 +74,13 @@ func TestKayveeDecoding(t *testing.T) { Title: "errors on invalid JSON (missing a quote)", Input: `prefix {"a:"b"} postfix`, ExpectedOutput: map[string]interface{}{}, - ExpectedError: &json.SyntaxError{}, + ExpectedError: &NonKayveeError{}, + }, + Spec{ + Title: "errors on empty JSON: {}", + Input: `prefix {} postfix`, + ExpectedOutput: map[string]interface{}{}, + ExpectedError: &NonKayveeError{}, }, } From 1be812a887b470a975692d904d801373fde3bdb3 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 7 Aug 2017 19:22:10 +0000 Subject: [PATCH 19/28] Added less hacky way of downloading jars --- Makefile | 66 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/Makefile b/Makefile index 7b6fb74..2549e7f 100644 --- a/Makefile +++ b/Makefile @@ -2,52 +2,52 @@ include golang.mk .DEFAULT_GOAL := test # override default goal set in library makefile SHELL := /bin/bash -JAR_DIR := jars PKG := github.com/Clever/amazon-kinesis-client-go PKGS := $(shell go list ./... | grep -v /vendor ) .PHONY: download_jars run build - -URL_PREFIX := http://search.maven.org/remotecontent?filepath= - -# this list lifted from https://github.com/awslabs/amazon-kinesis-client-python/blob/fb49c6390c0593fbcf81d6c34c5245726c15b2f3/setup.py#L60 -JARS_TO_DOWNLOAD := $(addprefix $(JAR_DIR)/,com/amazonaws/amazon-kinesis-client/1.7.2/amazon-kinesis-client-1.7.2.jar \ -com/amazonaws/aws-java-sdk-dynamodb/1.11.14/aws-java-sdk-dynamodb-1.11.14.jar \ -com/amazonaws/aws-java-sdk-s3/1.11.14/aws-java-sdk-s3-1.11.14.jar \ -com/amazonaws/aws-java-sdk-kms/1.11.14/aws-java-sdk-kms-1.11.14.jar \ -com/amazonaws/aws-java-sdk-core/1.11.14/aws-java-sdk-core-1.11.14.jar \ -commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar \ -org/apache/httpcomponents/httpclient/4.5.2/httpclient-4.5.2.jar \ -org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar \ -commons-codec/commons-codec/1.9/commons-codec-1.9.jar \ -com/fasterxml/jackson/core/jackson-databind/2.6.6/jackson-databind-2.6.6.jar \ -com/fasterxml/jackson/core/jackson-annotations/2.6.0/jackson-annotations-2.6.0.jar \ -com/fasterxml/jackson/core/jackson-core/2.6.6/jackson-core-2.6.6.jar \ -com/fasterxml/jackson/dataformat/jackson-dataformat-cbor/2.6.6/jackson-dataformat-cbor-2.6.6.jar \ -joda-time/joda-time/2.8.1/joda-time-2.8.1.jar \ -com/amazonaws/aws-java-sdk-kinesis/1.11.14/aws-java-sdk-kinesis-1.11.14.jar \ -com/amazonaws/aws-java-sdk-cloudwatch/1.11.14/aws-java-sdk-cloudwatch-1.11.14.jar \ -com/google/guava/guava/18.0/guava-18.0.jar \ -com/google/protobuf/protobuf-java/2.6.1/protobuf-java-2.6.1.jar \ -commons-lang/commons-lang/2.6/commons-lang-2.6.jar) - -EMPTY := -SPACE := $(EMPTY) $(EMPTY) -JAVA_CLASS_PATH := $(subst $(SPACE),:,$(JARS_TO_DOWNLOAD)) +$(eval $(call golang-version-check,1.8)) CONSUMER ?= consumer +TMP_DIR := ./tmp-jars +JAR_DIR := ./jars/ +KCL_VERSION := 1.7.6 -$(JARS_TO_DOWNLOAD): - mkdir -p `dirname $@` - curl -s -L -o $@ -O $(URL_PREFIX)`echo $@ | sed 's/$(JAR_DIR)\///g'` +define POM_XML_FOR_GETTING_DEPENDENT_JARS + + 4.0.0 + com.clever.kinesisconsumers + $(CONSUMER) + 1.0-SNAPSHOT + + + com.amazonaws + amazon-kinesis-client + $(KCL_VERSION) + + + +endef +export POM_XML_FOR_GETTING_DEPENDENT_JARS +download_jars: + command -v mvn >/dev/null 2>&1 || { echo >&2 "Maven not installed. Install maven!"; exit 1; } + mkdir -p $(JAR_DIR) $(TMP_DIR) + echo $$POM_XML_FOR_GETTING_DEPENDENT_JARS > $(TMP_DIR)/pom.xml + cd $(TMP_DIR) && mvn dependency:copy-dependencies + mv $(TMP_DIR)/target/dependency/* $(JAR_DIR)/ + # Download the STS jar file for supporting IAM Roles + ls $(JAR_DIR)/aws-java-sdk-core-*.jar | sed -e "s/.*-sdk-core-//g" | sed -e "s/\.jar//g" > /tmp/version.txt + curl -o $(JAR_DIR)/aws-java-sdk-sts-`cat /tmp/version.txt`.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-sts/`cat /tmp/version.txt`/aws-java-sdk-sts-`cat /tmp/version.txt`.jar + rm -r $(TMP_DIR) -download_jars: $(JARS_TO_DOWNLOAD) +all: test build build: CGO_ENABLED=0 go build -installsuffix cgo -o build/$(CONSUMER) $(PKG)/cmd/$(CONSUMER) run: build download_jars command -v java >/dev/null 2>&1 || { echo >&2 "Java not installed. Install java!"; exit 1; } - java -cp $(JAVA_CLASS_PATH) \ + java -cp "$(JAR_DIR)/*" \ com.amazonaws.services.kinesis.multilang.MultiLangDaemon \ $(CONSUMER).properties From b7743c9ea7f1312064a8c24da560ff8f5c447273 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Tue, 8 Aug 2017 19:09:31 +0000 Subject: [PATCH 20/28] Added a more coherent shutdown pathway --- batchconsumer/batchermanager.go | 21 ++++++++++++++------- batchconsumer/checkpointmanager.go | 27 +++++++++++++++------------ batchconsumer/writer.go | 3 ++- kcl/kcl.go | 26 ++++++++++++++------------ 4 files changed, 45 insertions(+), 32 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 22ca8cd..8b469c3 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -28,7 +28,7 @@ type batcherManager struct { batchMsg chan tagMsgPair lastIgnored chan kcl.SequencePair lastProcessed chan kcl.SequencePair - shutdown chan struct{} + shutdown chan chan<- struct{} } func NewBatcherManager( @@ -46,7 +46,7 @@ func NewBatcherManager( batchMsg: make(chan tagMsgPair), lastIgnored: make(chan kcl.SequencePair), lastProcessed: make(chan kcl.SequencePair), - shutdown: make(chan struct{}), + shutdown: make(chan chan<- struct{}), } bm.startMessageHandler(bm.batchMsg, bm.lastIgnored, bm.lastProcessed, bm.shutdown) @@ -66,8 +66,11 @@ func (b *batcherManager) LatestProcessed(pair kcl.SequencePair) { b.lastProcessed <- pair } -func (b *batcherManager) Shutdown() { - b.shutdown <- struct{}{} +func (b *batcherManager) Shutdown() <-chan struct{} { + done := make(chan struct{}) + b.shutdown <- done + + return done } func (b *batcherManager) createBatcher() *batcher { @@ -132,7 +135,7 @@ func (b *batcherManager) sendCheckpoint( // go routine to avoid racey conditions. func (b *batcherManager) startMessageHandler( batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair, - shutdown <-chan struct{}, + shutdown <-chan chan<- struct{}, ) { go func() { var lastProcessedPair kcl.SequencePair @@ -189,12 +192,16 @@ func (b *batcherManager) startMessageHandler( } case pair := <-lastProcessed: lastProcessedPair = pair - case <-shutdown: + case done := <-shutdown: for tag, batcher := range batchers { b.sendBatch(batcher, tag) } b.chkpntManager.Checkpoint(lastProcessedPair) - b.chkpntManager.Shutdown() + chkDone := b.chkpntManager.Shutdown() + <-chkDone + + done <- struct{}{} + return } } }() diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index ec8a9dd..752fe85 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -15,7 +15,7 @@ type checkpointManager struct { checkpointFreq time.Duration checkpoint chan kcl.SequencePair - shutdown chan struct{} + shutdown chan chan<- struct{} } func NewCheckpointManager( @@ -27,7 +27,7 @@ func NewCheckpointManager( checkpointFreq: config.CheckpointFreq, checkpoint: make(chan kcl.SequencePair), - shutdown: make(chan struct{}), + shutdown: make(chan chan<- struct{}), } cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown) @@ -39,33 +39,35 @@ func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) { cm.checkpoint <- pair } -func (cm *checkpointManager) Shutdown() { - cm.shutdown <- struct{}{} +func (cm *checkpointManager) Shutdown() <-chan struct{} { + done := make(chan struct{}) + cm.shutdown <- done + + return done } func (cm *checkpointManager) startCheckpointHandler( - checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair, shutdown <-chan struct{}, + checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair, + shutdown <-chan chan<- struct{}, ) { go func() { lastCheckpoint := time.Now() for { + var doneShutdown chan<- struct{} pair := kcl.SequencePair{} - isShuttingDown := false select { case pair = <-checkpoint: - case <-shutdown: - isShuttingDown = true + case doneShutdown = <-shutdown: } // This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq. // The latest pair number is always used. - for !isShuttingDown && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq { + for doneShutdown == nil && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq { select { case pair = <-checkpoint: // Keep updating checkpoint pair while waiting - case <-shutdown: - isShuttingDown = true + case doneShutdown = <-shutdown: case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C: } } @@ -76,8 +78,9 @@ func (cm *checkpointManager) startCheckpointHandler( stats.Counter("checkpoints-sent", 1) } - if isShuttingDown { + if doneShutdown != nil { checkpointer.Shutdown() + doneShutdown <- struct{}{} return } } diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 8035b9f..b0fd89e 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -143,7 +143,8 @@ func (b *batchedWriter) Shutdown(reason string) error { b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } - b.batcherManager.Shutdown() + done := b.batcherManager.Shutdown() + <-done return nil } diff --git a/kcl/kcl.go b/kcl/kcl.go index 399a7c2..3ff33bc 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -13,6 +13,7 @@ import ( type RecordProcessor interface { Initialize(shardID string, checkpointer Checkpointer) error ProcessRecords(records []Record) error + // Shutdown this call should block until it's safe to shutdown the process Shutdown(reason string) error } @@ -138,7 +139,6 @@ func New( ioHandler: i, recordProcessor: recordProcessor, - isShuttingDown: false, nextCheckpointPair: SequencePair{}, } } @@ -149,7 +149,6 @@ type KCLProcess struct { ioHandler ioHandler recordProcessor RecordProcessor - isShuttingDown bool nextCheckpointPair SequencePair } @@ -163,7 +162,8 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair) { } func (kclp *KCLProcess) Shutdown() { - kclp.isShuttingDown = true + kclp.ioHandler.writeError("Checkpoint shutdown") + kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown } func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { @@ -230,6 +230,17 @@ func (kclp *KCLProcess) handleLine(line string) error { switch action := action.(type) { case ActionCheckpoint: err = kclp.handleCheckpointAction(action) + case ActionShutdown: + kclp.ioHandler.writeError("Received shutdown action...") + + // Shutdown should block until it's save to shutdown the process + err = kclp.recordProcessor.Shutdown(action.Reason) + if err != nil { // Log error and continue shutting down + kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err)) + } + + kclp.ioHandler.writeError("Reporting shutdown done") + return kclp.reportDone("shutdown") case ActionInitialize: err = kclp.recordProcessor.Initialize(action.ShardID, kclp) if err == nil { @@ -240,11 +251,6 @@ func (kclp *KCLProcess) handleLine(line string) error { if err == nil { err = kclp.reportDone(action.Action) } - case ActionShutdown: - err = kclp.recordProcessor.Shutdown(action.Reason) - if err == nil { - err = kclp.reportDone(action.Action) - } default: err = fmt.Errorf("unknown action to dispatch: %+#v", action) } @@ -285,9 +291,5 @@ func (kclp *KCLProcess) Run() { } } kclp.ckpmux.Unlock() - - if kclp.isShuttingDown { - kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown - } } } From dd9da85055bc80e21a56229455ee1867bef90f43 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 10 Aug 2017 19:15:53 +0000 Subject: [PATCH 21/28] Fixed type-o --- kcl/kcl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kcl/kcl.go b/kcl/kcl.go index 3ff33bc..93aaf0d 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -233,7 +233,7 @@ func (kclp *KCLProcess) handleLine(line string) error { case ActionShutdown: kclp.ioHandler.writeError("Received shutdown action...") - // Shutdown should block until it's save to shutdown the process + // Shutdown should block until it's safe to shutdown the process err = kclp.recordProcessor.Shutdown(action.Reason) if err != nil { // Log error and continue shutting down kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err)) From 1632c81fc7471a11d979294411042f38e2fbfeec Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 10 Aug 2017 19:20:55 +0000 Subject: [PATCH 22/28] Use errors.New instead of fmt.Errorf --- batchconsumer/batcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batchconsumer/batcher.go b/batchconsumer/batcher.go index 83d3a47..06e8dca 100644 --- a/batchconsumer/batcher.go +++ b/batchconsumer/batcher.go @@ -1,13 +1,13 @@ package batchconsumer import ( - "fmt" + "errors" "time" "github.com/Clever/amazon-kinesis-client-go/kcl" ) -var ErrBatchFull = fmt.Errorf("The batch is full") +var ErrBatchFull = errors.New("The batch is full") type batcher struct { flushCount int From 27cce441400022a2334356172687b67ca21495b6 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 10 Aug 2017 19:21:46 +0000 Subject: [PATCH 23/28] Made constructors of private structs private --- batchconsumer/batchermanager.go | 2 +- batchconsumer/checkpointmanager.go | 2 +- batchconsumer/writer.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 8b469c3..1cbe54b 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -31,7 +31,7 @@ type batcherManager struct { shutdown chan chan<- struct{} } -func NewBatcherManager( +func newBatcherManager( sender Sender, chkpntManager *checkpointManager, config Config, log kv.KayveeLogger, ) *batcherManager { bm := &batcherManager{ diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index 752fe85..a76fd0c 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -18,7 +18,7 @@ type checkpointManager struct { shutdown chan chan<- struct{} } -func NewCheckpointManager( +func newCheckpointManager( checkpointer kcl.Checkpointer, config Config, log kv.KayveeLogger, ) *checkpointManager { cm := &checkpointManager{ diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index b0fd89e..a4e2d7d 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -43,8 +43,8 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID - b.chkpntManager = NewCheckpointManager(checkpointer, b.config, b.log) - b.batcherManager = NewBatcherManager(b.sender, b.chkpntManager, b.config, b.log) + b.chkpntManager = newCheckpointManager(checkpointer, b.config, b.log) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, b.config, b.log) return nil } From 68a16cfe6a53e8937d69225030f241f1e2feb19f Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 10 Aug 2017 19:47:08 +0000 Subject: [PATCH 24/28] Added an external go routine to ensure stale batches get flushed --- batchconsumer/batchermanager.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 1cbe54b..b8b26f6 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -137,25 +137,30 @@ func (b *batcherManager) startMessageHandler( batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair, shutdown <-chan chan<- struct{}, ) { + flushStaleBatches := make(chan struct{}) + + go func() { + for { // Flush batches that haven't been updated recently + <-time.NewTimer(time.Second).C + flushStaleBatches <- struct{}{} + } + }() + go func() { var lastProcessedPair kcl.SequencePair var lastIgnoredPair kcl.SequencePair batchers := map[string]*batcher{} for { - for tag, batcher := range batchers { // Flush batcher that hasn't been updated recently - if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) { - b.sendBatch(batcher, tag) - b.sendCheckpoint(tag, lastIgnoredPair, batchers) - batcher.Clear() - } - } - select { - case <-time.NewTimer(time.Second).C: - // Timer is janky way to ensure the code above is ran at regular intervals - // Can't put above code under this case because it's very possible that this - // timer is always pre-empted by other channels + case <-flushStaleBatches: + for tag, batcher := range batchers { + if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) { + b.sendBatch(batcher, tag) + b.sendCheckpoint(tag, lastIgnoredPair, batchers) + batcher.Clear() + } + } case tmp := <-batchMsg: batcher, ok := batchers[tmp.tag] if !ok { From 6102f9c752b020bb27712452c28b0d1679d621c8 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 10 Aug 2017 19:56:17 +0000 Subject: [PATCH 25/28] Added comment to why a channel is taking a channel. --- batchconsumer/batchermanager.go | 3 ++- batchconsumer/checkpointmanager.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index b8b26f6..620d222 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -46,7 +46,8 @@ func newBatcherManager( batchMsg: make(chan tagMsgPair), lastIgnored: make(chan kcl.SequencePair), lastProcessed: make(chan kcl.SequencePair), - shutdown: make(chan chan<- struct{}), + // shutdown chan takes "done" channel to signal when batchermanager is done shutting down + shutdown: make(chan chan<- struct{}), } bm.startMessageHandler(bm.batchMsg, bm.lastIgnored, bm.lastProcessed, bm.shutdown) diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index a76fd0c..8ff4c45 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -15,7 +15,8 @@ type checkpointManager struct { checkpointFreq time.Duration checkpoint chan kcl.SequencePair - shutdown chan chan<- struct{} + // shutdown chan takes "done" channel to signal when checkpointManager is done shutting down + shutdown chan chan<- struct{} } func newCheckpointManager( From 3a04dad4bf444821be4fa6b4d1799b26f21f8649 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 10 Aug 2017 20:01:07 +0000 Subject: [PATCH 26/28] Send single value instead of full config object to CheckpointManager --- batchconsumer/checkpointmanager.go | 4 ++-- batchconsumer/writer.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index 8ff4c45..f526e27 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -20,12 +20,12 @@ type checkpointManager struct { } func newCheckpointManager( - checkpointer kcl.Checkpointer, config Config, log kv.KayveeLogger, + checkpointer kcl.Checkpointer, checkpointFreq time.Duration, log kv.KayveeLogger, ) *checkpointManager { cm := &checkpointManager{ log: log, - checkpointFreq: config.CheckpointFreq, + checkpointFreq: checkpointFreq, checkpoint: make(chan kcl.SequencePair), shutdown: make(chan chan<- struct{}), diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index a4e2d7d..e230e00 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -43,7 +43,7 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID - b.chkpntManager = newCheckpointManager(checkpointer, b.config, b.log) + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, b.config, b.log) return nil From 45fad863d0e356eb5987493d15bc78937e427626 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 10 Aug 2017 20:11:24 +0000 Subject: [PATCH 27/28] Made config object that's specific to BatcherManager --- batchconsumer/batchermanager.go | 14 ++++++++++---- batchconsumer/writer.go | 8 +++++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 620d222..7c0d3ad 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -16,6 +16,12 @@ type tagMsgPair struct { pair kcl.SequencePair } +type batcherManagerConfig struct { + BatchCount int + BatchSize int + BatchInterval time.Duration +} + type batcherManager struct { log kv.KayveeLogger sender Sender @@ -32,16 +38,16 @@ type batcherManager struct { } func newBatcherManager( - sender Sender, chkpntManager *checkpointManager, config Config, log kv.KayveeLogger, + sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger, ) *batcherManager { bm := &batcherManager{ log: log, sender: sender, chkpntManager: chkpntManager, - batchCount: config.BatchCount, - batchSize: config.BatchSize, - batchInterval: config.BatchInterval, + batchCount: cfg.BatchCount, + batchSize: cfg.BatchSize, + batchInterval: cfg.BatchInterval, batchMsg: make(chan tagMsgPair), lastIgnored: make(chan kcl.SequencePair), diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index e230e00..095dd73 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -43,8 +43,14 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID + bmConfig := batcherManagerConfig{ + BatchCount: b.config.BatchCount, + BatchSize: b.config.BatchSize, + BatchInterval: b.config.BatchInterval, + } + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) - b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, b.config, b.log) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log) return nil } From 55aeecddd75ede227cffdc7fa2a73fd7aab35198 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 10 Aug 2017 20:16:41 +0000 Subject: [PATCH 28/28] Renamed IsEmpty to IsNil on SequencePair --- batchconsumer/batcher.go | 2 +- batchconsumer/batchermanager.go | 4 ++-- batchconsumer/checkpointmanager.go | 2 +- batchconsumer/writer.go | 2 +- kcl/kcl.go | 6 +++--- kcl/sequencepair.go | 4 ++-- kcl/sequencepair_test.go | 8 ++++---- 7 files changed, 14 insertions(+), 14 deletions(-) diff --git a/batchconsumer/batcher.go b/batchconsumer/batcher.go index 06e8dca..2d8480b 100644 --- a/batchconsumer/batcher.go +++ b/batchconsumer/batcher.go @@ -38,7 +38,7 @@ func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error { } b.Batch = append(b.Batch, msg) - if b.SmallestSeq.IsEmpty() || pair.IsLessThan(b.SmallestSeq) { + if b.SmallestSeq.IsNil() || pair.IsLessThan(b.SmallestSeq) { b.SmallestSeq = pair } b.LastUpdated = time.Now() diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 7c0d3ad..bb1a3fe 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -128,12 +128,12 @@ func (b *batcherManager) sendCheckpoint( } // Check for empty because it's possible that no messages have been ignored - if smallest.IsEmpty() || batcher.SmallestSeq.IsLessThan(smallest) { + if smallest.IsNil() || batcher.SmallestSeq.IsLessThan(smallest) { smallest = batcher.SmallestSeq } } - if !smallest.IsEmpty() { + if !smallest.IsNil() { b.chkpntManager.Checkpoint(smallest) } } diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index f526e27..6f312e0 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -73,7 +73,7 @@ func (cm *checkpointManager) startCheckpointHandler( } } - if !pair.IsEmpty() { + if !pair.IsNil() { checkpointer.Checkpoint(pair) lastCheckpoint = time.Now() stats.Counter("checkpoints-sent", 1) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 095dd73..af102d1 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -82,7 +82,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { } pair = kcl.SequencePair{seq, record.SubSequenceNumber} - if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty + if prevPair.IsNil() { // Handles on-start edge case where b.lastProcessSeq is empty prevPair = pair } diff --git a/kcl/kcl.go b/kcl/kcl.go index 93aaf0d..3270be1 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -156,7 +156,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair) { kclp.ckpmux.Lock() defer kclp.ckpmux.Unlock() - if kclp.nextCheckpointPair.IsEmpty() || kclp.nextCheckpointPair.IsLessThan(pair) { + if kclp.nextCheckpointPair.IsNil() || kclp.nextCheckpointPair.IsLessThan(pair) { kclp.nextCheckpointPair = pair } } @@ -189,7 +189,7 @@ func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { subSeq := action.SubSequenceNumber kclp.ckpmux.Lock() - if !kclp.nextCheckpointPair.IsEmpty() { + if !kclp.nextCheckpointPair.IsNil() { tmp := kclp.nextCheckpointPair.Sequence.String() seq = &tmp subSeq = &kclp.nextCheckpointPair.SubSequence @@ -279,7 +279,7 @@ func (kclp *KCLProcess) Run() { } kclp.ckpmux.Lock() - if !kclp.nextCheckpointPair.IsEmpty() { + if !kclp.nextCheckpointPair.IsNil() { seq := kclp.nextCheckpointPair.Sequence.String() subSeq := kclp.nextCheckpointPair.SubSequence diff --git a/kcl/sequencepair.go b/kcl/sequencepair.go index df0fbfe..685d099 100644 --- a/kcl/sequencepair.go +++ b/kcl/sequencepair.go @@ -10,12 +10,12 @@ type SequencePair struct { SubSequence int } -func (s SequencePair) IsEmpty() bool { +func (s SequencePair) IsNil() bool { return s.Sequence == nil } func (s SequencePair) IsLessThan(pair SequencePair) bool { - if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable + if s.IsNil() || pair.IsNil() { // empty pairs are incomparable return false } diff --git a/kcl/sequencepair_test.go b/kcl/sequencepair_test.go index 58d253f..51e0744 100644 --- a/kcl/sequencepair_test.go +++ b/kcl/sequencepair_test.go @@ -46,9 +46,9 @@ func TestSequencePairIsLessThan(t *testing.T) { func TestSequencePairEmpty(t *testing.T) { assert := assert.New(t) - assert.True(SequencePair{nil, 0}.IsEmpty()) - assert.True(SequencePair{nil, 10000}.IsEmpty()) + assert.True(SequencePair{nil, 0}.IsNil()) + assert.True(SequencePair{nil, 10000}.IsNil()) - assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty()) - assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty()) + assert.False(SequencePair{big.NewInt(10), 0}.IsNil()) + assert.False(SequencePair{big.NewInt(0), 1000}.IsNil()) }