Moved SequencePair to kcl package

This commit is contained in:
Xavi Ramirez 2017-08-02 19:45:23 +00:00
parent 4c6d8453ef
commit 04042290f5
5 changed files with 113 additions and 97 deletions

View file

@ -2,37 +2,12 @@ package batcher
import ( import (
"fmt" "fmt"
"math/big"
"sync" "sync"
"time" "time"
"github.com/Clever/amazon-kinesis-client-go/kcl"
) )
// SequencePair a convience way to pass around a Sequence / SubSequence pair
type SequencePair struct {
Sequence *big.Int
SubSequence int
}
func (s SequencePair) IsEmpty() bool {
return s.Sequence == nil
}
func (s SequencePair) IsLessThan(pair SequencePair) bool {
if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable
return false
}
cmp := s.Sequence.Cmp(pair.Sequence)
if cmp == -1 {
return true
}
if cmp == 1 {
return false
}
return s.SubSequence < pair.SubSequence
}
// Sync is used to allow a writer to syncronize with the batcher. // Sync is used to allow a writer to syncronize with the batcher.
// The writer declares how to write messages (via its `SendBatch` method), while the batcher // The writer declares how to write messages (via its `SendBatch` method), while the batcher
// keeps track of messages written // keeps track of messages written
@ -43,17 +18,17 @@ type Sync interface {
// Batcher interface // Batcher interface
type Batcher interface { type Batcher interface {
// AddMesage to the batch // AddMesage to the batch
AddMessage(msg []byte, sequencePair SequencePair) error AddMessage(msg []byte, sequencePair kcl.SequencePair) error
// Flush all messages from the batch // Flush all messages from the batch
Flush() Flush()
// SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in // SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in
// the current batch // the current batch
SmallestSequencePair() SequencePair SmallestSequencePair() kcl.SequencePair
} }
type msgPack struct { type msgPack struct {
msg []byte msg []byte
sequencePair SequencePair sequencePair kcl.SequencePair
} }
type batcher struct { type batcher struct {
@ -64,7 +39,7 @@ type batcher struct {
flushSize int flushSize int
// smallestSeq are used for checkpointing // smallestSeq are used for checkpointing
smallestSeq SequencePair smallestSeq kcl.SequencePair
sync Sync sync Sync
msgChan chan<- msgPack msgChan chan<- msgPack
@ -104,7 +79,7 @@ func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int)
return b, nil return b, nil
} }
func (b *batcher) SmallestSequencePair() SequencePair { func (b *batcher) SmallestSequencePair() kcl.SequencePair {
b.mux.Lock() b.mux.Lock()
defer b.mux.Unlock() defer b.mux.Unlock()
@ -123,7 +98,7 @@ func (b *batcher) SetFlushSize(size int) {
b.flushSize = size b.flushSize = size
} }
func (b *batcher) AddMessage(msg []byte, pair SequencePair) error { func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error {
if len(msg) <= 0 { if len(msg) <= 0 {
return fmt.Errorf("Empty messages can't be sent") return fmt.Errorf("Empty messages can't be sent")
} }
@ -135,7 +110,7 @@ func (b *batcher) AddMessage(msg []byte, pair SequencePair) error {
// updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch. // updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch.
// When flush() is called, the batcher sends the sequence number to the writer. When the writer // When flush() is called, the batcher sends the sequence number to the writer. When the writer
// checkpoints, it does so up to the latest message that was flushed successfully. // checkpoints, it does so up to the latest message that was flushed successfully.
func (b *batcher) updateSequenceNumbers(pair SequencePair) { func (b *batcher) updateSequenceNumbers(pair kcl.SequencePair) {
b.mux.Lock() b.mux.Lock()
defer b.mux.Unlock() defer b.mux.Unlock()
@ -162,7 +137,7 @@ func (b *batcher) flush(batch [][]byte) [][]byte {
b.sync.SendBatch(batch) b.sync.SendBatch(batch)
b.mux.Lock() b.mux.Lock()
b.smallestSeq = SequencePair{nil, 0} b.smallestSeq = kcl.SequencePair{}
b.mux.Unlock() b.mux.Unlock()
} }
return [][]byte{} return [][]byte{}

View file

@ -7,6 +7,8 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/Clever/amazon-kinesis-client-go/kcl"
) )
type batch [][]byte type batch [][]byte
@ -37,7 +39,7 @@ func (m *MockSync) waitForFlush(timeout time.Duration) error {
} }
} }
var mockSequence = SequencePair{big.NewInt(99999), 12345} var mockSequence = kcl.SequencePair{big.NewInt(99999), 12345}
func TestBatchingByCount(t *testing.T) { func TestBatchingByCount(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
@ -195,70 +197,24 @@ func TestUpdatingSequence(t *testing.T) {
expected := new(big.Int) expected := new(big.Int)
t.Log("After AddMessage (seq=1), smallestSeq = 1") t.Log("After AddMessage (seq=1), smallestSeq = 1")
batcher.updateSequenceNumbers(SequencePair{big.NewInt(1), 1234}) batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(1), 1234})
expected.SetInt64(1) expected.SetInt64(1)
seq := batcher.SmallestSequencePair() seq := batcher.SmallestSequencePair()
assert.True(expected.Cmp(seq.Sequence) == 0) assert.True(expected.Cmp(seq.Sequence) == 0)
t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher") t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher")
batcher.updateSequenceNumbers(SequencePair{big.NewInt(2), 1234}) batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(2), 1234})
seq = batcher.SmallestSequencePair() seq = batcher.SmallestSequencePair()
assert.True(expected.Cmp(seq.Sequence) == 0) assert.True(expected.Cmp(seq.Sequence) == 0)
t.Log("After AddMessage (seq=1), smallestSeq = 0") t.Log("After AddMessage (seq=1), smallestSeq = 0")
batcher.updateSequenceNumbers(SequencePair{big.NewInt(0), 1234}) batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(0), 1234})
expected.SetInt64(0) expected.SetInt64(0)
seq = batcher.SmallestSequencePair() seq = batcher.SmallestSequencePair()
assert.True(expected.Cmp(seq.Sequence) == 0) assert.True(expected.Cmp(seq.Sequence) == 0)
t.Log("Flushing batch clears smallest sequence pair") t.Log("Flushing batch clears smallest sequence pair")
assert.NoError(batcher.AddMessage([]byte("cdcd"), SequencePair{big.NewInt(2), 1234})) assert.NoError(batcher.AddMessage([]byte("cdcd"), kcl.SequencePair{big.NewInt(2), 1234}))
sync.waitForFlush(time.Minute) sync.waitForFlush(time.Minute)
assert.Nil(batcher.SmallestSequencePair().Sequence) assert.Nil(batcher.SmallestSequencePair().Sequence)
} }
func TestSequencePairIsLessThan(t *testing.T) {
assert := assert.New(t)
big10 := big.NewInt(10)
big5 := big.NewInt(5)
tests := []struct {
left SequencePair
right SequencePair
isLess bool
}{
{left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false},
{left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false},
{left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false},
{left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true},
{left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true},
{left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false},
{left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false},
}
for _, test := range tests {
left := test.left
right := test.right
t.Logf(
"Is <%s, %d> less than <%s, %d>? %t",
left.Sequence.String(), left.SubSequence,
right.Sequence.String(), right.SubSequence,
test.isLess,
)
assert.Equal(test.isLess, left.IsLessThan(right))
}
}
func TestSequencePairEmpty(t *testing.T) {
assert := assert.New(t)
assert.True(SequencePair{nil, 0}.IsEmpty())
assert.True(SequencePair{nil, 10000}.IsEmpty())
assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty())
assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty())
}

View file

@ -19,7 +19,7 @@ import (
type tagMsgPair struct { type tagMsgPair struct {
tag string tag string
msg []byte msg []byte
pair batcher.SequencePair pair kcl.SequencePair
} }
type batchedWriter struct { type batchedWriter struct {
@ -29,16 +29,16 @@ type batchedWriter struct {
shardID string shardID string
checkpointMsg chan batcher.SequencePair checkpointMsg chan kcl.SequencePair
checkpointTag chan string checkpointTag chan string
lastProcessedPair chan batcher.SequencePair lastProcessedPair chan kcl.SequencePair
batchMsg chan tagMsgPair batchMsg chan tagMsgPair
flushBatches chan struct{} flushBatches chan struct{}
// Limits the number of records read from the stream // Limits the number of records read from the stream
rateLimiter *rate.Limiter rateLimiter *rate.Limiter
lastProcessedSeq batcher.SequencePair lastProcessedSeq kcl.SequencePair
} }
func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter { func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter {
@ -53,13 +53,13 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche
func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
b.shardID = shardID b.shardID = shardID
b.checkpointMsg = make(chan batcher.SequencePair) b.checkpointMsg = make(chan kcl.SequencePair)
b.startCheckpointListener(checkpointer, b.checkpointMsg) b.startCheckpointListener(checkpointer, b.checkpointMsg)
b.checkpointTag = make(chan string) b.checkpointTag = make(chan string)
b.batchMsg = make(chan tagMsgPair) b.batchMsg = make(chan tagMsgPair)
b.flushBatches = make(chan struct{}) b.flushBatches = make(chan struct{})
b.lastProcessedPair = make(chan batcher.SequencePair) b.lastProcessedPair = make(chan kcl.SequencePair)
b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches) b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches)
return nil return nil
@ -95,7 +95,7 @@ func (b *batchedWriter) handleCheckpointError(err error) bool {
} }
func (b *batchedWriter) startCheckpointListener( func (b *batchedWriter) startCheckpointListener(
checkpointer kcl.Checkpointer, checkpointMsg <-chan batcher.SequencePair, checkpointer kcl.Checkpointer, checkpointMsg <-chan kcl.SequencePair,
) { ) {
go func() { go func() {
lastCheckpoint := time.Now() lastCheckpoint := time.Now()
@ -152,11 +152,11 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher {
// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a // startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a
// go routine to avoid racey conditions. // go routine to avoid racey conditions.
func (b *batchedWriter) startMessageHandler( func (b *batchedWriter) startMessageHandler(
batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan batcher.SequencePair, batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan kcl.SequencePair,
flushBatches <-chan struct{}, flushBatches <-chan struct{},
) { ) {
go func() { go func() {
var lastProcessedPair batcher.SequencePair var lastProcessedPair kcl.SequencePair
batchers := map[string]batcher.Batcher{} batchers := map[string]batcher.Batcher{}
areBatchersEmpty := true areBatchersEmpty := true
@ -231,7 +231,7 @@ func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error)
} }
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
var pair batcher.SequencePair var pair kcl.SequencePair
prevPair := b.lastProcessedSeq prevPair := b.lastProcessedSeq
for _, record := range records { for _, record := range records {
@ -243,7 +243,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber) return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
} }
pair = batcher.SequencePair{seq, record.SubSequenceNumber} pair = kcl.SequencePair{seq, record.SubSequenceNumber}
if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty
prevPair = pair prevPair = pair
} }

31
kcl/sequencepair.go Normal file
View file

@ -0,0 +1,31 @@
package kcl
import (
"math/big"
)
// SequencePair a convience way to pass around a Sequence / SubSequence pair
type SequencePair struct {
Sequence *big.Int
SubSequence int
}
func (s SequencePair) IsEmpty() bool {
return s.Sequence == nil
}
func (s SequencePair) IsLessThan(pair SequencePair) bool {
if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable
return false
}
cmp := s.Sequence.Cmp(pair.Sequence)
if cmp == -1 {
return true
}
if cmp == 1 {
return false
}
return s.SubSequence < pair.SubSequence
}

54
kcl/sequencepair_test.go Normal file
View file

@ -0,0 +1,54 @@
package kcl
import (
"math/big"
"testing"
"github.com/stretchr/testify/assert"
)
func TestSequencePairIsLessThan(t *testing.T) {
assert := assert.New(t)
big10 := big.NewInt(10)
big5 := big.NewInt(5)
tests := []struct {
left SequencePair
right SequencePair
isLess bool
}{
{left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false},
{left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false},
{left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false},
{left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true},
{left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true},
{left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false},
{left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false},
}
for _, test := range tests {
left := test.left
right := test.right
t.Logf(
"Is <%s, %d> less than <%s, %d>? %t",
left.Sequence.String(), left.SubSequence,
right.Sequence.String(), right.SubSequence,
test.isLess,
)
assert.Equal(test.isLess, left.IsLessThan(right))
}
}
func TestSequencePairEmpty(t *testing.T) {
assert := assert.New(t)
assert.True(SequencePair{nil, 0}.IsEmpty())
assert.True(SequencePair{nil, 10000}.IsEmpty())
assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty())
assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty())
}