Added writter unit tests and fixed a slew of concurrency bugs

This commit is contained in:
Xavi Ramirez 2017-07-21 01:35:54 +00:00
parent 939fc5406f
commit 212ec45d88
5 changed files with 505 additions and 64 deletions

View file

@ -159,8 +159,9 @@ func (b *batcher) batchSize(batch [][]byte) int {
func (b *batcher) flush(batch [][]byte) [][]byte { func (b *batcher) flush(batch [][]byte) [][]byte {
if len(batch) > 0 { if len(batch) > 0 {
b.mux.Lock()
b.sync.SendBatch(batch) b.sync.SendBatch(batch)
b.mux.Lock()
b.smallestSeq = SequencePair{nil, 0} b.smallestSeq = SequencePair{nil, 0}
b.mux.Unlock() b.mux.Unlock()
} }

View file

@ -98,11 +98,7 @@ func NewBatchConsumerFromFiles(
kvlog := logger.New("amazon-kinesis-client-go") kvlog := logger.New("amazon-kinesis-client-go")
kvlog.SetOutput(file) kvlog.SetOutput(file)
wrt := &batchedWriter{ wrt := NewBatchedWriter(config, sender, kvlog)
config: config,
log: kvlog,
sender: sender,
}
kclProcess := kcl.New(input, output, errFile, wrt) kclProcess := kcl.New(input, output, errFile, wrt)
return &BatchConsumer{ return &BatchConsumer{

View file

@ -7,5 +7,4 @@ type batcherSync struct {
func (b *batcherSync) SendBatch(batch [][]byte) { func (b *batcherSync) SendBatch(batch [][]byte) {
b.writer.SendBatch(batch, b.tag) b.writer.SendBatch(batch, b.tag)
b.writer.CheckPointBatch(b.tag)
} }

View file

@ -16,28 +16,51 @@ import (
"github.com/Clever/amazon-kinesis-client-go/splitter" "github.com/Clever/amazon-kinesis-client-go/splitter"
) )
type tagMsgPair struct {
tag string
msg []byte
pair batcher.SequencePair
}
type batchedWriter struct { type batchedWriter struct {
config Config config Config
sender Sender sender Sender
log kv.KayveeLogger log kv.KayveeLogger
shardID string shardID string
checkpointChan chan batcher.SequencePair
checkpointMsg chan batcher.SequencePair
checkpointTag chan string
lastProcessedPair chan batcher.SequencePair
batchMsg chan tagMsgPair
flushBatches chan struct{}
// Limits the number of records read from the stream // Limits the number of records read from the stream
rateLimiter *rate.Limiter rateLimiter *rate.Limiter
batchers map[string]batcher.Batcher
lastProcessedSeq batcher.SequencePair lastProcessedSeq batcher.SequencePair
} }
func (b *batchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointer) error { func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter {
b.batchers = map[string]batcher.Batcher{} return &batchedWriter{
b.shardID = shardID config: config,
b.checkpointChan = make(chan batcher.SequencePair) sender: sender,
b.rateLimiter = rate.NewLimiter(rate.Limit(b.config.ReadRateLimit), b.config.ReadBurstLimit) log: log,
b.startCheckpointListener(checkpointer, b.checkpointChan) rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit),
}
}
func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
b.shardID = shardID
b.checkpointMsg = make(chan batcher.SequencePair)
b.startCheckpointListener(checkpointer, b.checkpointMsg)
b.checkpointTag = make(chan string)
b.batchMsg = make(chan tagMsgPair)
b.flushBatches = make(chan struct{})
b.lastProcessedPair = make(chan batcher.SequencePair)
b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches)
return nil return nil
} }
@ -72,19 +95,19 @@ func (b *batchedWriter) handleCheckpointError(err error) bool {
} }
func (b *batchedWriter) startCheckpointListener( func (b *batchedWriter) startCheckpointListener(
checkpointer *kcl.Checkpointer, checkpointChan <-chan batcher.SequencePair, checkpointer kcl.Checkpointer, checkpointMsg <-chan batcher.SequencePair,
) { ) {
lastCheckpoint := time.Now()
go func() { go func() {
lastCheckpoint := time.Now()
for { for {
seq := <-checkpointChan seq := <-checkpointMsg
// This is a write throttle to ensure we don't checkpoint faster than // This is a write throttle to ensure we don't checkpoint faster than
// b.config.CheckpointFreq. The latest seq number is always used. // b.config.CheckpointFreq. The latest seq number is always used.
for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq {
select { select {
case seq = <-checkpointChan: // Keep updating checkpoint seq while waiting case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting
case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C: case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C:
} }
} }
@ -126,6 +149,74 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher {
return batch return batch
} }
// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a
// go routine to avoid racey conditions.
func (b *batchedWriter) startMessageHandler(
batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan batcher.SequencePair,
flushBatches <-chan struct{},
) {
go func() {
var lastProcessedPair batcher.SequencePair
batchers := map[string]batcher.Batcher{}
areBatchersEmpty := true
for {
select {
case tmp := <-batchMsg:
batcher, ok := batchers[tmp.tag]
if !ok {
batcher = b.createBatcher(tmp.tag)
batchers[tmp.tag] = batcher
}
err := batcher.AddMessage(tmp.msg, tmp.pair)
if err != nil {
b.log.ErrorD("add-message", kv.M{
"err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag,
})
}
areBatchersEmpty = false
case tag := <-checkpointTag:
smallest := lastProcessedPair
isAllEmpty := true
for name, batch := range batchers {
if tag == name {
continue
}
pair := batch.SmallestSequencePair()
if pair.IsEmpty() { // Occurs when batch has no items
continue
}
if pair.IsLessThan(smallest) {
smallest = pair
}
isAllEmpty = false
}
if !smallest.IsEmpty() {
b.checkpointMsg <- smallest
}
areBatchersEmpty = isAllEmpty
case pair := <-lastPair:
if areBatchersEmpty {
b.checkpointMsg <- pair
}
lastProcessedPair = pair
case <-flushBatches:
for _, batch := range batchers {
batch.Flush()
}
b.checkpointMsg <- lastProcessedPair
areBatchersEmpty = true
}
}
}()
}
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
// We handle two types of records: // We handle two types of records:
// - records emitted from CWLogs Subscription // - records emitted from CWLogs Subscription
@ -140,7 +231,8 @@ func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error)
} }
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
curSequence := b.lastProcessedSeq var pair batcher.SequencePair
prevPair := b.lastProcessedSeq
for _, record := range records { for _, record := range records {
// Wait until rate limiter permits one more record to be processed // Wait until rate limiter permits one more record to be processed
@ -151,77 +243,58 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber) return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
} }
b.lastProcessedSeq = curSequence // Updated with the value from the previous iteration pair = batcher.SequencePair{seq, record.SubSequenceNumber}
curSequence = batcher.SequencePair{seq, record.SubSequenceNumber} if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty
prevPair = pair
}
data, err := base64.StdEncoding.DecodeString(record.Data) data, err := base64.StdEncoding.DecodeString(record.Data)
if err != nil { if err != nil {
return err return err
} }
rawlogs, err := b.splitMessageIfNecessary(data) messages, err := b.splitMessageIfNecessary(data)
if err != nil { if err != nil {
return err return err
} }
for _, rawlog := range rawlogs { for _, rawmsg := range messages {
log, tags, err := b.sender.ProcessMessage(rawlog) msg, tags, err := b.sender.ProcessMessage(rawmsg)
if err == ErrMessageIgnored { if err == ErrMessageIgnored {
continue // Skip message continue // Skip message
} else if err != nil { } else if err != nil {
return err b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)})
continue // Don't stop processing messages because of one bad message
} }
if len(tags) == 0 { if len(tags) == 0 {
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawlog)) b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)})
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg))
} }
for _, tag := range tags { for _, tag := range tags {
batcher, ok := b.batchers[tag] if tag == "" {
if !ok { b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)})
batcher = b.createBatcher(tag) return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg))
b.batchers[tag] = batcher
} }
// Use second to last sequence number to ensure we don't checkpoint a message before // Use second to last sequence number to ensure we don't checkpoint a message before
// it's been sent. When batches are sent, conceptually we first find the smallest // it's been sent. When batches are sent, conceptually we first find the smallest
// sequence number amount all the batch (let's call it A). We then checkpoint at // sequence number amount all the batch (let's call it A). We then checkpoint at
// the A-1 sequence number. // the A-1 sequence number.
err = batcher.AddMessage(log, b.lastProcessedSeq) b.batchMsg <- tagMsgPair{tag, msg, prevPair}
if err != nil {
return err
}
} }
} }
}
b.lastProcessedSeq = curSequence prevPair = pair
b.lastProcessedPair <- pair
}
b.lastProcessedSeq = pair
return nil return nil
} }
func (b *batchedWriter) CheckPointBatch(tag string) {
smallest := b.lastProcessedSeq
for name, batch := range b.batchers {
if tag == name {
continue
}
pair := batch.SmallestSequencePair()
if pair.Sequence == nil { // Occurs when batch has no items
continue
}
if pair.IsLessThan(smallest) {
smallest = pair
}
}
b.checkpointChan <- smallest
}
func (b *batchedWriter) SendBatch(batch [][]byte, tag string) { func (b *batchedWriter) SendBatch(batch [][]byte, tag string) {
b.log.Info("sent-batch")
err := b.sender.SendBatch(batch, tag) err := b.sender.SendBatch(batch, tag)
switch e := err.(type) { switch e := err.(type) {
case nil: // Do nothing case nil: // Do nothing
@ -237,14 +310,14 @@ func (b *batchedWriter) SendBatch(batch [][]byte, tag string) {
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
os.Exit(1) os.Exit(1)
} }
b.checkpointTag <- tag
} }
func (b *batchedWriter) Shutdown(reason string) error { func (b *batchedWriter) Shutdown(reason string) error {
if reason == "TERMINATE" { if reason == "TERMINATE" {
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
for _, batch := range b.batchers { b.flushBatches <- struct{}{}
batch.Flush()
}
} else { } else {
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
} }

View file

@ -0,0 +1,372 @@
package batchconsumer
import (
"encoding/base64"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"gopkg.in/Clever/kayvee-go.v6/logger"
"github.com/Clever/amazon-kinesis-client-go/kcl"
)
type ignoringSender struct{}
func (i ignoringSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
return nil, nil, ErrMessageIgnored
}
func (i ignoringSender) SendBatch(batch [][]byte, tag string) error {
panic("SendBatch Should never be called. ProcessMessage ignores all messasges.")
}
type tagBatch struct {
tag string
batch [][]byte
}
type msgAsTagSender struct {
batches map[string][][][]byte
saveBatch chan tagBatch
shutdown chan struct{}
}
func NewMsgAsTagSender() *msgAsTagSender {
sender := &msgAsTagSender{
batches: map[string][][][]byte{},
saveBatch: make(chan tagBatch),
shutdown: make(chan struct{}),
}
sender.startBatchSaver(sender.saveBatch, sender.shutdown)
return sender
}
func (i *msgAsTagSender) startBatchSaver(saveBatch <-chan tagBatch, shutdown <-chan struct{}) {
go func() {
for {
select {
case tb := <-saveBatch:
batches, ok := i.batches[tb.tag]
if !ok {
batches = [][][]byte{}
}
i.batches[tb.tag] = append(batches, tb.batch)
case <-shutdown:
return
}
}
}()
}
func (i *msgAsTagSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
if "ignore" == string(rawmsg) {
return nil, nil, ErrMessageIgnored
}
return rawmsg, []string{string(rawmsg)}, nil
}
func (i *msgAsTagSender) SendBatch(batch [][]byte, tag string) error {
i.saveBatch <- tagBatch{tag, batch}
return nil
}
func (i *msgAsTagSender) Shutdown() {
i.shutdown <- struct{}{}
}
type mockCheckpointer struct {
recievedSequences []string
checkpoint chan string
done chan struct{}
timeout chan struct{}
shutdown chan struct{}
}
func NewMockCheckpointer(maxSeq string, timeout time.Duration) *mockCheckpointer {
mcp := &mockCheckpointer{
checkpoint: make(chan string),
done: make(chan struct{}, 1),
timeout: make(chan struct{}, 1),
shutdown: make(chan struct{}),
}
mcp.startWaiter(maxSeq, timeout)
return mcp
}
func (m *mockCheckpointer) startWaiter(maxSeq string, timeout time.Duration) {
go func() {
for {
select {
case seq := <-m.checkpoint:
m.recievedSequences = append(m.recievedSequences, seq)
if seq == maxSeq {
m.done <- struct{}{}
}
case <-time.NewTimer(timeout).C:
m.timeout <- struct{}{}
case <-m.shutdown:
return
}
}
}()
}
func (m *mockCheckpointer) wait() error {
select {
case <-m.done:
return nil
case <-m.timeout:
return fmt.Errorf("timeout waiting for checkpoints")
}
}
func (m *mockCheckpointer) Shutdown() {
m.shutdown <- struct{}{}
}
func (m *mockCheckpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error {
m.checkpoint <- *sequenceNumber
return nil
}
func (m *mockCheckpointer) CheckpointWithRetry(
sequenceNumber *string, subSequenceNumber *int, retryCount int,
) error {
return m.Checkpoint(sequenceNumber, subSequenceNumber)
}
func encode(str string) string {
return base64.StdEncoding.EncodeToString([]byte(str))
}
func TestProcessRecordsIgnoredMessages(t *testing.T) {
assert := assert.New(t)
mocklog := logger.New("testing")
mockconfig := withDefaults(Config{
BatchInterval: 10 * time.Millisecond,
CheckpointFreq: 20 * time.Millisecond,
})
mockcheckpointer := NewMockCheckpointer("4", 5*time.Second)
wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog)
wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("hi")},
kcl.Record{SequenceNumber: "2", Data: encode("hi")},
kcl.Record{SequenceNumber: "3", Data: encode("hi")},
kcl.Record{SequenceNumber: "4", Data: encode("hi")},
})
assert.NoError(err)
err = mockcheckpointer.wait()
assert.NoError(err)
}
func TestProcessRecordsMutliBatchBasic(t *testing.T) {
assert := assert.New(t)
mocklog := logger.New("testing")
mockconfig := withDefaults(Config{
BatchInterval: 100 * time.Millisecond,
CheckpointFreq: 200 * time.Millisecond,
})
mockcheckpointer := NewMockCheckpointer("8", 5*time.Second)
mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("tag1")},
kcl.Record{SequenceNumber: "2", Data: encode("tag2")},
kcl.Record{SequenceNumber: "3", Data: encode("tag3")},
kcl.Record{SequenceNumber: "4", Data: encode("tag2")},
})
assert.NoError(err)
err = wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "5", Data: encode("tag3")},
kcl.Record{SequenceNumber: "6", Data: encode("tag2")},
kcl.Record{SequenceNumber: "7", Data: encode("tag3")},
kcl.Record{SequenceNumber: "8", Data: encode("tag1")},
})
assert.NoError(err)
time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once
err = wrt.Shutdown("TERMINATE")
assert.NoError(err)
err = mockcheckpointer.wait()
assert.NoError(err)
mocksender.Shutdown()
assert.Contains(mocksender.batches, "tag1")
assert.Equal(1, len(mocksender.batches["tag1"])) // One batch
assert.Equal(2, len(mocksender.batches["tag1"][0])) // with two items
assert.Equal("tag1", string(mocksender.batches["tag1"][0][0]))
assert.Equal("tag1", string(mocksender.batches["tag1"][0][1]))
assert.Contains(mocksender.batches, "tag2")
assert.Equal(1, len(mocksender.batches["tag2"])) // One batch
assert.Equal(3, len(mocksender.batches["tag2"][0])) // with three items
assert.Equal("tag2", string(mocksender.batches["tag2"][0][0]))
assert.Equal("tag2", string(mocksender.batches["tag2"][0][1]))
assert.Equal("tag2", string(mocksender.batches["tag2"][0][2]))
assert.Contains(mocksender.batches, "tag3")
assert.Equal(1, len(mocksender.batches["tag3"])) // One batch
assert.Equal(3, len(mocksender.batches["tag3"][0])) // with three items
assert.Equal("tag3", string(mocksender.batches["tag3"][0][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][1]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][2]))
}
func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
assert := assert.New(t)
mocklog := logger.New("testing")
mockconfig := withDefaults(Config{
BatchInterval: 100 * time.Millisecond,
CheckpointFreq: 200 * time.Millisecond,
})
mockcheckpointer := NewMockCheckpointer("26", 5*time.Second)
mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("ignore")},
kcl.Record{SequenceNumber: "2", Data: encode("ignore")},
kcl.Record{SequenceNumber: "3", Data: encode("ignore")},
kcl.Record{SequenceNumber: "4", Data: encode("ignore")},
kcl.Record{SequenceNumber: "5", Data: encode("tag1")},
kcl.Record{SequenceNumber: "6", Data: encode("ignore")},
kcl.Record{SequenceNumber: "7", Data: encode("tag2")},
kcl.Record{SequenceNumber: "8", Data: encode("tag3")},
kcl.Record{SequenceNumber: "9", Data: encode("ignore")},
kcl.Record{SequenceNumber: "10", Data: encode("tag2")},
kcl.Record{SequenceNumber: "11", Data: encode("ignore")},
kcl.Record{SequenceNumber: "12", Data: encode("ignore")},
kcl.Record{SequenceNumber: "13", Data: encode("ignore")},
})
assert.NoError(err)
err = wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "14", Data: encode("ignore")},
kcl.Record{SequenceNumber: "15", Data: encode("ignore")},
kcl.Record{SequenceNumber: "16", Data: encode("ignore")},
kcl.Record{SequenceNumber: "17", Data: encode("ignore")},
kcl.Record{SequenceNumber: "18", Data: encode("tag3")},
kcl.Record{SequenceNumber: "19", Data: encode("tag2")},
kcl.Record{SequenceNumber: "20", Data: encode("ignore")},
kcl.Record{SequenceNumber: "21", Data: encode("tag3")},
kcl.Record{SequenceNumber: "22", Data: encode("ignore")},
kcl.Record{SequenceNumber: "23", Data: encode("ignore")},
kcl.Record{SequenceNumber: "24", Data: encode("ignore")},
kcl.Record{SequenceNumber: "25", Data: encode("tag1")},
kcl.Record{SequenceNumber: "26", Data: encode("ignore")},
})
assert.NoError(err)
time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once
err = wrt.Shutdown("TERMINATE")
assert.NoError(err)
err = mockcheckpointer.wait()
assert.NoError(err)
mocksender.Shutdown()
assert.Contains(mocksender.batches, "tag1")
assert.Equal(1, len(mocksender.batches["tag1"])) // One batch
assert.Equal(2, len(mocksender.batches["tag1"][0])) // with two items
assert.Equal("tag1", string(mocksender.batches["tag1"][0][0]))
assert.Equal("tag1", string(mocksender.batches["tag1"][0][1]))
assert.Contains(mocksender.batches, "tag2")
assert.Equal(1, len(mocksender.batches["tag2"])) // One batch
assert.Equal(3, len(mocksender.batches["tag2"][0])) // with three items
assert.Equal("tag2", string(mocksender.batches["tag2"][0][0]))
assert.Equal("tag2", string(mocksender.batches["tag2"][0][1]))
assert.Equal("tag2", string(mocksender.batches["tag2"][0][2]))
assert.Contains(mocksender.batches, "tag3")
assert.Equal(1, len(mocksender.batches["tag3"])) // One batch
assert.Equal(3, len(mocksender.batches["tag3"][0])) // with three items
assert.Equal("tag3", string(mocksender.batches["tag3"][0][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][1]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][2]))
}
func TestStaggeredCheckpionting(t *testing.T) {
assert := assert.New(t)
mocklog := logger.New("testing")
mockconfig := withDefaults(Config{
BatchCount: 2,
BatchInterval: 100 * time.Millisecond,
CheckpointFreq: 200 * time.Nanosecond,
})
mockcheckpointer := NewMockCheckpointer("9", 5*time.Second)
mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("tag1")},
kcl.Record{SequenceNumber: "2", Data: encode("tag3")},
kcl.Record{SequenceNumber: "3", Data: encode("tag1")},
kcl.Record{SequenceNumber: "4", Data: encode("tag3")},
})
assert.NoError(err)
err = wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "5", Data: encode("tag1")},
kcl.Record{SequenceNumber: "6", Data: encode("tag3")},
kcl.Record{SequenceNumber: "7", Data: encode("tag3")},
kcl.Record{SequenceNumber: "8", Data: encode("tag3")},
kcl.Record{SequenceNumber: "9", Data: encode("tag3")},
})
assert.NoError(err)
time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once
err = wrt.Shutdown("TERMINATE")
assert.NoError(err)
err = mockcheckpointer.wait()
assert.NoError(err)
mocksender.Shutdown()
mockcheckpointer.Shutdown()
// Test to make sure writer doesn't prematurely checkpoint messages
// Checkpoints 5,6,7,8 will never be submitted because the 3rd "tag1" is in a batch
// Checkpoint 9 is submitted on shutdown when everything is being flushed
assert.NotContains(mockcheckpointer.recievedSequences, "5")
assert.NotContains(mockcheckpointer.recievedSequences, "6")
assert.NotContains(mockcheckpointer.recievedSequences, "7")
assert.NotContains(mockcheckpointer.recievedSequences, "8")
assert.Contains(mocksender.batches, "tag1")
assert.Equal(2, len(mocksender.batches["tag1"])) // One batch
assert.Equal(2, len(mocksender.batches["tag1"][0])) // with two items
assert.Equal("tag1", string(mocksender.batches["tag1"][0][0]))
assert.Equal("tag1", string(mocksender.batches["tag1"][0][1]))
assert.Equal("tag1", string(mocksender.batches["tag1"][1][0]))
assert.Contains(mocksender.batches, "tag3")
assert.Equal(3, len(mocksender.batches["tag3"])) // One batch
assert.Equal(2, len(mocksender.batches["tag3"][0])) // with three items
assert.Equal("tag3", string(mocksender.batches["tag3"][0][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][1]))
assert.Equal("tag3", string(mocksender.batches["tag3"][1][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][1][1]))
assert.Equal("tag3", string(mocksender.batches["tag3"][2][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][2][1]))
}