diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go index 82245e8..0257fde 100644 --- a/batchconsumer/batcher/message_batcher.go +++ b/batchconsumer/batcher/message_batcher.go @@ -2,37 +2,12 @@ package batcher import ( "fmt" - "math/big" "sync" "time" + + "github.com/Clever/amazon-kinesis-client-go/kcl" ) -// SequencePair a convience way to pass around a Sequence / SubSequence pair -type SequencePair struct { - Sequence *big.Int - SubSequence int -} - -func (s SequencePair) IsEmpty() bool { - return s.Sequence == nil -} - -func (s SequencePair) IsLessThan(pair SequencePair) bool { - if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable - return false - } - - cmp := s.Sequence.Cmp(pair.Sequence) - if cmp == -1 { - return true - } - if cmp == 1 { - return false - } - - return s.SubSequence < pair.SubSequence -} - // Sync is used to allow a writer to syncronize with the batcher. // The writer declares how to write messages (via its `SendBatch` method), while the batcher // keeps track of messages written @@ -43,17 +18,17 @@ type Sync interface { // Batcher interface type Batcher interface { // AddMesage to the batch - AddMessage(msg []byte, sequencePair SequencePair) error + AddMessage(msg []byte, sequencePair kcl.SequencePair) error // Flush all messages from the batch Flush() // SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in // the current batch - SmallestSequencePair() SequencePair + SmallestSequencePair() kcl.SequencePair } type msgPack struct { msg []byte - sequencePair SequencePair + sequencePair kcl.SequencePair } type batcher struct { @@ -64,7 +39,7 @@ type batcher struct { flushSize int // smallestSeq are used for checkpointing - smallestSeq SequencePair + smallestSeq kcl.SequencePair sync Sync msgChan chan<- msgPack @@ -104,7 +79,7 @@ func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) return b, nil } -func (b *batcher) SmallestSequencePair() SequencePair { +func (b *batcher) SmallestSequencePair() kcl.SequencePair { b.mux.Lock() defer b.mux.Unlock() @@ -123,7 +98,7 @@ func (b *batcher) SetFlushSize(size int) { b.flushSize = size } -func (b *batcher) AddMessage(msg []byte, pair SequencePair) error { +func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error { if len(msg) <= 0 { return fmt.Errorf("Empty messages can't be sent") } @@ -135,7 +110,7 @@ func (b *batcher) AddMessage(msg []byte, pair SequencePair) error { // updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch. // When flush() is called, the batcher sends the sequence number to the writer. When the writer // checkpoints, it does so up to the latest message that was flushed successfully. -func (b *batcher) updateSequenceNumbers(pair SequencePair) { +func (b *batcher) updateSequenceNumbers(pair kcl.SequencePair) { b.mux.Lock() defer b.mux.Unlock() @@ -162,7 +137,7 @@ func (b *batcher) flush(batch [][]byte) [][]byte { b.sync.SendBatch(batch) b.mux.Lock() - b.smallestSeq = SequencePair{nil, 0} + b.smallestSeq = kcl.SequencePair{} b.mux.Unlock() } return [][]byte{} diff --git a/batchconsumer/batcher/message_batcher_test.go b/batchconsumer/batcher/message_batcher_test.go index 8ab9039..73d55fa 100644 --- a/batchconsumer/batcher/message_batcher_test.go +++ b/batchconsumer/batcher/message_batcher_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/Clever/amazon-kinesis-client-go/kcl" ) type batch [][]byte @@ -37,7 +39,7 @@ func (m *MockSync) waitForFlush(timeout time.Duration) error { } } -var mockSequence = SequencePair{big.NewInt(99999), 12345} +var mockSequence = kcl.SequencePair{big.NewInt(99999), 12345} func TestBatchingByCount(t *testing.T) { assert := assert.New(t) @@ -195,70 +197,24 @@ func TestUpdatingSequence(t *testing.T) { expected := new(big.Int) t.Log("After AddMessage (seq=1), smallestSeq = 1") - batcher.updateSequenceNumbers(SequencePair{big.NewInt(1), 1234}) + batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(1), 1234}) expected.SetInt64(1) seq := batcher.SmallestSequencePair() assert.True(expected.Cmp(seq.Sequence) == 0) t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher") - batcher.updateSequenceNumbers(SequencePair{big.NewInt(2), 1234}) + batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(2), 1234}) seq = batcher.SmallestSequencePair() assert.True(expected.Cmp(seq.Sequence) == 0) t.Log("After AddMessage (seq=1), smallestSeq = 0") - batcher.updateSequenceNumbers(SequencePair{big.NewInt(0), 1234}) + batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(0), 1234}) expected.SetInt64(0) seq = batcher.SmallestSequencePair() assert.True(expected.Cmp(seq.Sequence) == 0) t.Log("Flushing batch clears smallest sequence pair") - assert.NoError(batcher.AddMessage([]byte("cdcd"), SequencePair{big.NewInt(2), 1234})) + assert.NoError(batcher.AddMessage([]byte("cdcd"), kcl.SequencePair{big.NewInt(2), 1234})) sync.waitForFlush(time.Minute) assert.Nil(batcher.SmallestSequencePair().Sequence) } - -func TestSequencePairIsLessThan(t *testing.T) { - assert := assert.New(t) - - big10 := big.NewInt(10) - big5 := big.NewInt(5) - - tests := []struct { - left SequencePair - right SequencePair - isLess bool - }{ - {left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false}, - {left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false}, - {left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false}, - - {left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true}, - {left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true}, - - {left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false}, - {left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false}, - } - - for _, test := range tests { - left := test.left - right := test.right - t.Logf( - "Is <%s, %d> less than <%s, %d>? %t", - left.Sequence.String(), left.SubSequence, - right.Sequence.String(), right.SubSequence, - test.isLess, - ) - - assert.Equal(test.isLess, left.IsLessThan(right)) - } -} - -func TestSequencePairEmpty(t *testing.T) { - assert := assert.New(t) - - assert.True(SequencePair{nil, 0}.IsEmpty()) - assert.True(SequencePair{nil, 10000}.IsEmpty()) - - assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty()) - assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty()) -} diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index f641f62..7b99cd3 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -19,7 +19,7 @@ import ( type tagMsgPair struct { tag string msg []byte - pair batcher.SequencePair + pair kcl.SequencePair } type batchedWriter struct { @@ -29,16 +29,16 @@ type batchedWriter struct { shardID string - checkpointMsg chan batcher.SequencePair + checkpointMsg chan kcl.SequencePair checkpointTag chan string - lastProcessedPair chan batcher.SequencePair + lastProcessedPair chan kcl.SequencePair batchMsg chan tagMsgPair flushBatches chan struct{} // Limits the number of records read from the stream rateLimiter *rate.Limiter - lastProcessedSeq batcher.SequencePair + lastProcessedSeq kcl.SequencePair } func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter { @@ -53,13 +53,13 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID - b.checkpointMsg = make(chan batcher.SequencePair) + b.checkpointMsg = make(chan kcl.SequencePair) b.startCheckpointListener(checkpointer, b.checkpointMsg) b.checkpointTag = make(chan string) b.batchMsg = make(chan tagMsgPair) b.flushBatches = make(chan struct{}) - b.lastProcessedPair = make(chan batcher.SequencePair) + b.lastProcessedPair = make(chan kcl.SequencePair) b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches) return nil @@ -95,7 +95,7 @@ func (b *batchedWriter) handleCheckpointError(err error) bool { } func (b *batchedWriter) startCheckpointListener( - checkpointer kcl.Checkpointer, checkpointMsg <-chan batcher.SequencePair, + checkpointer kcl.Checkpointer, checkpointMsg <-chan kcl.SequencePair, ) { go func() { lastCheckpoint := time.Now() @@ -152,11 +152,11 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { // startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a // go routine to avoid racey conditions. func (b *batchedWriter) startMessageHandler( - batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan batcher.SequencePair, + batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan kcl.SequencePair, flushBatches <-chan struct{}, ) { go func() { - var lastProcessedPair batcher.SequencePair + var lastProcessedPair kcl.SequencePair batchers := map[string]batcher.Batcher{} areBatchersEmpty := true @@ -231,7 +231,7 @@ func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) } func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { - var pair batcher.SequencePair + var pair kcl.SequencePair prevPair := b.lastProcessedSeq for _, record := range records { @@ -243,7 +243,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber) } - pair = batcher.SequencePair{seq, record.SubSequenceNumber} + pair = kcl.SequencePair{seq, record.SubSequenceNumber} if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty prevPair = pair } diff --git a/kcl/sequencepair.go b/kcl/sequencepair.go new file mode 100644 index 0000000..df0fbfe --- /dev/null +++ b/kcl/sequencepair.go @@ -0,0 +1,31 @@ +package kcl + +import ( + "math/big" +) + +// SequencePair a convience way to pass around a Sequence / SubSequence pair +type SequencePair struct { + Sequence *big.Int + SubSequence int +} + +func (s SequencePair) IsEmpty() bool { + return s.Sequence == nil +} + +func (s SequencePair) IsLessThan(pair SequencePair) bool { + if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable + return false + } + + cmp := s.Sequence.Cmp(pair.Sequence) + if cmp == -1 { + return true + } + if cmp == 1 { + return false + } + + return s.SubSequence < pair.SubSequence +} diff --git a/kcl/sequencepair_test.go b/kcl/sequencepair_test.go new file mode 100644 index 0000000..58d253f --- /dev/null +++ b/kcl/sequencepair_test.go @@ -0,0 +1,54 @@ +package kcl + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSequencePairIsLessThan(t *testing.T) { + assert := assert.New(t) + + big10 := big.NewInt(10) + big5 := big.NewInt(5) + + tests := []struct { + left SequencePair + right SequencePair + isLess bool + }{ + {left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false}, + {left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false}, + {left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false}, + + {left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true}, + {left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true}, + + {left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false}, + {left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false}, + } + + for _, test := range tests { + left := test.left + right := test.right + t.Logf( + "Is <%s, %d> less than <%s, %d>? %t", + left.Sequence.String(), left.SubSequence, + right.Sequence.String(), right.SubSequence, + test.isLess, + ) + + assert.Equal(test.isLess, left.IsLessThan(right)) + } +} + +func TestSequencePairEmpty(t *testing.T) { + assert := assert.New(t) + + assert.True(SequencePair{nil, 0}.IsEmpty()) + assert.True(SequencePair{nil, 10000}.IsEmpty()) + + assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty()) + assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty()) +}