diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go index fbf25ba..82245e8 100644 --- a/batchconsumer/batcher/message_batcher.go +++ b/batchconsumer/batcher/message_batcher.go @@ -159,8 +159,9 @@ func (b *batcher) batchSize(batch [][]byte) int { func (b *batcher) flush(batch [][]byte) [][]byte { if len(batch) > 0 { - b.mux.Lock() b.sync.SendBatch(batch) + + b.mux.Lock() b.smallestSeq = SequencePair{nil, 0} b.mux.Unlock() } diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 7d6eae5..f43ef05 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -98,11 +98,7 @@ func NewBatchConsumerFromFiles( kvlog := logger.New("amazon-kinesis-client-go") kvlog.SetOutput(file) - wrt := &batchedWriter{ - config: config, - log: kvlog, - sender: sender, - } + wrt := NewBatchedWriter(config, sender, kvlog) kclProcess := kcl.New(input, output, errFile, wrt) return &BatchConsumer{ diff --git a/batchconsumer/sync.go b/batchconsumer/sync.go index 517a943..f50a703 100644 --- a/batchconsumer/sync.go +++ b/batchconsumer/sync.go @@ -7,5 +7,4 @@ type batcherSync struct { func (b *batcherSync) SendBatch(batch [][]byte) { b.writer.SendBatch(batch, b.tag) - b.writer.CheckPointBatch(b.tag) } diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index a3100cd..f641f62 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -16,28 +16,51 @@ import ( "github.com/Clever/amazon-kinesis-client-go/splitter" ) +type tagMsgPair struct { + tag string + msg []byte + pair batcher.SequencePair +} + type batchedWriter struct { config Config sender Sender log kv.KayveeLogger - shardID string - checkpointChan chan batcher.SequencePair + shardID string + + checkpointMsg chan batcher.SequencePair + checkpointTag chan string + lastProcessedPair chan batcher.SequencePair + batchMsg chan tagMsgPair + flushBatches chan struct{} // Limits the number of records read from the stream rateLimiter *rate.Limiter - batchers map[string]batcher.Batcher lastProcessedSeq batcher.SequencePair } -func (b *batchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointer) error { - b.batchers = map[string]batcher.Batcher{} - b.shardID = shardID - b.checkpointChan = make(chan batcher.SequencePair) - b.rateLimiter = rate.NewLimiter(rate.Limit(b.config.ReadRateLimit), b.config.ReadBurstLimit) +func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter { + return &batchedWriter{ + config: config, + sender: sender, + log: log, - b.startCheckpointListener(checkpointer, b.checkpointChan) + rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit), + } +} + +func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { + b.shardID = shardID + b.checkpointMsg = make(chan batcher.SequencePair) + b.startCheckpointListener(checkpointer, b.checkpointMsg) + + b.checkpointTag = make(chan string) + b.batchMsg = make(chan tagMsgPair) + b.flushBatches = make(chan struct{}) + b.lastProcessedPair = make(chan batcher.SequencePair) + b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches) return nil } @@ -72,19 +95,19 @@ func (b *batchedWriter) handleCheckpointError(err error) bool { } func (b *batchedWriter) startCheckpointListener( - checkpointer *kcl.Checkpointer, checkpointChan <-chan batcher.SequencePair, + checkpointer kcl.Checkpointer, checkpointMsg <-chan batcher.SequencePair, ) { - lastCheckpoint := time.Now() - go func() { + lastCheckpoint := time.Now() + for { - seq := <-checkpointChan + seq := <-checkpointMsg // This is a write throttle to ensure we don't checkpoint faster than // b.config.CheckpointFreq. The latest seq number is always used. for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { select { - case seq = <-checkpointChan: // Keep updating checkpoint seq while waiting + case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C: } } @@ -126,6 +149,74 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { return batch } +// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a +// go routine to avoid racey conditions. +func (b *batchedWriter) startMessageHandler( + batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan batcher.SequencePair, + flushBatches <-chan struct{}, +) { + go func() { + var lastProcessedPair batcher.SequencePair + batchers := map[string]batcher.Batcher{} + areBatchersEmpty := true + + for { + select { + case tmp := <-batchMsg: + batcher, ok := batchers[tmp.tag] + if !ok { + batcher = b.createBatcher(tmp.tag) + batchers[tmp.tag] = batcher + } + + err := batcher.AddMessage(tmp.msg, tmp.pair) + if err != nil { + b.log.ErrorD("add-message", kv.M{ + "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, + }) + } + areBatchersEmpty = false + case tag := <-checkpointTag: + smallest := lastProcessedPair + isAllEmpty := true + + for name, batch := range batchers { + if tag == name { + continue + } + + pair := batch.SmallestSequencePair() + if pair.IsEmpty() { // Occurs when batch has no items + continue + } + + if pair.IsLessThan(smallest) { + smallest = pair + } + + isAllEmpty = false + } + + if !smallest.IsEmpty() { + b.checkpointMsg <- smallest + } + areBatchersEmpty = isAllEmpty + case pair := <-lastPair: + if areBatchersEmpty { + b.checkpointMsg <- pair + } + lastProcessedPair = pair + case <-flushBatches: + for _, batch := range batchers { + batch.Flush() + } + b.checkpointMsg <- lastProcessedPair + areBatchersEmpty = true + } + } + }() +} + func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { // We handle two types of records: // - records emitted from CWLogs Subscription @@ -140,7 +231,8 @@ func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) } func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { - curSequence := b.lastProcessedSeq + var pair batcher.SequencePair + prevPair := b.lastProcessedSeq for _, record := range records { // Wait until rate limiter permits one more record to be processed @@ -151,77 +243,58 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber) } - b.lastProcessedSeq = curSequence // Updated with the value from the previous iteration - curSequence = batcher.SequencePair{seq, record.SubSequenceNumber} + pair = batcher.SequencePair{seq, record.SubSequenceNumber} + if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty + prevPair = pair + } data, err := base64.StdEncoding.DecodeString(record.Data) if err != nil { return err } - rawlogs, err := b.splitMessageIfNecessary(data) + messages, err := b.splitMessageIfNecessary(data) if err != nil { return err } - for _, rawlog := range rawlogs { - log, tags, err := b.sender.ProcessMessage(rawlog) + for _, rawmsg := range messages { + msg, tags, err := b.sender.ProcessMessage(rawmsg) + if err == ErrMessageIgnored { continue // Skip message } else if err != nil { - return err + b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) + continue // Don't stop processing messages because of one bad message } if len(tags) == 0 { - return fmt.Errorf("No tags provided by consumer for log: %s", string(rawlog)) + b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) + return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg)) } for _, tag := range tags { - batcher, ok := b.batchers[tag] - if !ok { - batcher = b.createBatcher(tag) - b.batchers[tag] = batcher + if tag == "" { + b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) + return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg)) } // Use second to last sequence number to ensure we don't checkpoint a message before // it's been sent. When batches are sent, conceptually we first find the smallest // sequence number amount all the batch (let's call it A). We then checkpoint at // the A-1 sequence number. - err = batcher.AddMessage(log, b.lastProcessedSeq) - if err != nil { - return err - } + b.batchMsg <- tagMsgPair{tag, msg, prevPair} } } - } - b.lastProcessedSeq = curSequence + prevPair = pair + b.lastProcessedPair <- pair + } + b.lastProcessedSeq = pair return nil } -func (b *batchedWriter) CheckPointBatch(tag string) { - smallest := b.lastProcessedSeq - - for name, batch := range b.batchers { - if tag == name { - continue - } - - pair := batch.SmallestSequencePair() - if pair.Sequence == nil { // Occurs when batch has no items - continue - } - - if pair.IsLessThan(smallest) { - smallest = pair - } - } - - b.checkpointChan <- smallest -} - func (b *batchedWriter) SendBatch(batch [][]byte, tag string) { - b.log.Info("sent-batch") err := b.sender.SendBatch(batch, tag) switch e := err.(type) { case nil: // Do nothing @@ -237,14 +310,14 @@ func (b *batchedWriter) SendBatch(batch [][]byte, tag string) { b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) } + + b.checkpointTag <- tag } func (b *batchedWriter) Shutdown(reason string) error { if reason == "TERMINATE" { b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) - for _, batch := range b.batchers { - batch.Flush() - } + b.flushBatches <- struct{}{} } else { b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go new file mode 100644 index 0000000..6622937 --- /dev/null +++ b/batchconsumer/writer_test.go @@ -0,0 +1,372 @@ +package batchconsumer + +import ( + "encoding/base64" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "gopkg.in/Clever/kayvee-go.v6/logger" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +type ignoringSender struct{} + +func (i ignoringSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) { + return nil, nil, ErrMessageIgnored +} + +func (i ignoringSender) SendBatch(batch [][]byte, tag string) error { + panic("SendBatch Should never be called. ProcessMessage ignores all messasges.") +} + +type tagBatch struct { + tag string + batch [][]byte +} +type msgAsTagSender struct { + batches map[string][][][]byte + saveBatch chan tagBatch + shutdown chan struct{} +} + +func NewMsgAsTagSender() *msgAsTagSender { + sender := &msgAsTagSender{ + batches: map[string][][][]byte{}, + saveBatch: make(chan tagBatch), + shutdown: make(chan struct{}), + } + + sender.startBatchSaver(sender.saveBatch, sender.shutdown) + + return sender +} + +func (i *msgAsTagSender) startBatchSaver(saveBatch <-chan tagBatch, shutdown <-chan struct{}) { + go func() { + for { + select { + case tb := <-saveBatch: + batches, ok := i.batches[tb.tag] + if !ok { + batches = [][][]byte{} + } + i.batches[tb.tag] = append(batches, tb.batch) + case <-shutdown: + return + } + } + }() +} + +func (i *msgAsTagSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) { + if "ignore" == string(rawmsg) { + return nil, nil, ErrMessageIgnored + } + + return rawmsg, []string{string(rawmsg)}, nil +} + +func (i *msgAsTagSender) SendBatch(batch [][]byte, tag string) error { + i.saveBatch <- tagBatch{tag, batch} + return nil +} + +func (i *msgAsTagSender) Shutdown() { + i.shutdown <- struct{}{} +} + +type mockCheckpointer struct { + recievedSequences []string + checkpoint chan string + done chan struct{} + timeout chan struct{} + shutdown chan struct{} +} + +func NewMockCheckpointer(maxSeq string, timeout time.Duration) *mockCheckpointer { + mcp := &mockCheckpointer{ + checkpoint: make(chan string), + done: make(chan struct{}, 1), + timeout: make(chan struct{}, 1), + shutdown: make(chan struct{}), + } + mcp.startWaiter(maxSeq, timeout) + + return mcp +} + +func (m *mockCheckpointer) startWaiter(maxSeq string, timeout time.Duration) { + go func() { + for { + select { + case seq := <-m.checkpoint: + m.recievedSequences = append(m.recievedSequences, seq) + if seq == maxSeq { + m.done <- struct{}{} + } + case <-time.NewTimer(timeout).C: + m.timeout <- struct{}{} + case <-m.shutdown: + return + } + } + }() +} +func (m *mockCheckpointer) wait() error { + select { + case <-m.done: + return nil + case <-m.timeout: + return fmt.Errorf("timeout waiting for checkpoints") + } +} +func (m *mockCheckpointer) Shutdown() { + m.shutdown <- struct{}{} +} +func (m *mockCheckpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { + m.checkpoint <- *sequenceNumber + return nil +} +func (m *mockCheckpointer) CheckpointWithRetry( + sequenceNumber *string, subSequenceNumber *int, retryCount int, +) error { + return m.Checkpoint(sequenceNumber, subSequenceNumber) +} + +func encode(str string) string { + return base64.StdEncoding.EncodeToString([]byte(str)) +} + +func TestProcessRecordsIgnoredMessages(t *testing.T) { + assert := assert.New(t) + + mocklog := logger.New("testing") + mockconfig := withDefaults(Config{ + BatchInterval: 10 * time.Millisecond, + CheckpointFreq: 20 * time.Millisecond, + }) + mockcheckpointer := NewMockCheckpointer("4", 5*time.Second) + + wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog) + wrt.Initialize("test-shard", mockcheckpointer) + + err := wrt.ProcessRecords([]kcl.Record{ + kcl.Record{SequenceNumber: "1", Data: encode("hi")}, + kcl.Record{SequenceNumber: "2", Data: encode("hi")}, + kcl.Record{SequenceNumber: "3", Data: encode("hi")}, + kcl.Record{SequenceNumber: "4", Data: encode("hi")}, + }) + assert.NoError(err) + + err = mockcheckpointer.wait() + assert.NoError(err) +} + +func TestProcessRecordsMutliBatchBasic(t *testing.T) { + assert := assert.New(t) + + mocklog := logger.New("testing") + mockconfig := withDefaults(Config{ + BatchInterval: 100 * time.Millisecond, + CheckpointFreq: 200 * time.Millisecond, + }) + mockcheckpointer := NewMockCheckpointer("8", 5*time.Second) + mocksender := NewMsgAsTagSender() + + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt.Initialize("test-shard", mockcheckpointer) + + err := wrt.ProcessRecords([]kcl.Record{ + kcl.Record{SequenceNumber: "1", Data: encode("tag1")}, + kcl.Record{SequenceNumber: "2", Data: encode("tag2")}, + kcl.Record{SequenceNumber: "3", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "4", Data: encode("tag2")}, + }) + assert.NoError(err) + err = wrt.ProcessRecords([]kcl.Record{ + kcl.Record{SequenceNumber: "5", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "6", Data: encode("tag2")}, + kcl.Record{SequenceNumber: "7", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "8", Data: encode("tag1")}, + }) + assert.NoError(err) + + time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once + + err = wrt.Shutdown("TERMINATE") + assert.NoError(err) + + err = mockcheckpointer.wait() + assert.NoError(err) + + mocksender.Shutdown() + + assert.Contains(mocksender.batches, "tag1") + assert.Equal(1, len(mocksender.batches["tag1"])) // One batch + assert.Equal(2, len(mocksender.batches["tag1"][0])) // with two items + assert.Equal("tag1", string(mocksender.batches["tag1"][0][0])) + assert.Equal("tag1", string(mocksender.batches["tag1"][0][1])) + + assert.Contains(mocksender.batches, "tag2") + assert.Equal(1, len(mocksender.batches["tag2"])) // One batch + assert.Equal(3, len(mocksender.batches["tag2"][0])) // with three items + assert.Equal("tag2", string(mocksender.batches["tag2"][0][0])) + assert.Equal("tag2", string(mocksender.batches["tag2"][0][1])) + assert.Equal("tag2", string(mocksender.batches["tag2"][0][2])) + + assert.Contains(mocksender.batches, "tag3") + assert.Equal(1, len(mocksender.batches["tag3"])) // One batch + assert.Equal(3, len(mocksender.batches["tag3"][0])) // with three items + assert.Equal("tag3", string(mocksender.batches["tag3"][0][0])) + assert.Equal("tag3", string(mocksender.batches["tag3"][0][1])) + assert.Equal("tag3", string(mocksender.batches["tag3"][0][2])) +} + +func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { + assert := assert.New(t) + + mocklog := logger.New("testing") + mockconfig := withDefaults(Config{ + BatchInterval: 100 * time.Millisecond, + CheckpointFreq: 200 * time.Millisecond, + }) + mockcheckpointer := NewMockCheckpointer("26", 5*time.Second) + mocksender := NewMsgAsTagSender() + + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt.Initialize("test-shard", mockcheckpointer) + + err := wrt.ProcessRecords([]kcl.Record{ + kcl.Record{SequenceNumber: "1", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "2", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "3", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "4", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "5", Data: encode("tag1")}, + kcl.Record{SequenceNumber: "6", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "7", Data: encode("tag2")}, + kcl.Record{SequenceNumber: "8", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "9", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "10", Data: encode("tag2")}, + kcl.Record{SequenceNumber: "11", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "12", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "13", Data: encode("ignore")}, + }) + assert.NoError(err) + err = wrt.ProcessRecords([]kcl.Record{ + kcl.Record{SequenceNumber: "14", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "15", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "16", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "17", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "18", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "19", Data: encode("tag2")}, + kcl.Record{SequenceNumber: "20", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "21", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "22", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "23", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "24", Data: encode("ignore")}, + kcl.Record{SequenceNumber: "25", Data: encode("tag1")}, + kcl.Record{SequenceNumber: "26", Data: encode("ignore")}, + }) + assert.NoError(err) + + time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once + + err = wrt.Shutdown("TERMINATE") + assert.NoError(err) + + err = mockcheckpointer.wait() + assert.NoError(err) + + mocksender.Shutdown() + + assert.Contains(mocksender.batches, "tag1") + assert.Equal(1, len(mocksender.batches["tag1"])) // One batch + assert.Equal(2, len(mocksender.batches["tag1"][0])) // with two items + assert.Equal("tag1", string(mocksender.batches["tag1"][0][0])) + assert.Equal("tag1", string(mocksender.batches["tag1"][0][1])) + + assert.Contains(mocksender.batches, "tag2") + assert.Equal(1, len(mocksender.batches["tag2"])) // One batch + assert.Equal(3, len(mocksender.batches["tag2"][0])) // with three items + assert.Equal("tag2", string(mocksender.batches["tag2"][0][0])) + assert.Equal("tag2", string(mocksender.batches["tag2"][0][1])) + assert.Equal("tag2", string(mocksender.batches["tag2"][0][2])) + + assert.Contains(mocksender.batches, "tag3") + assert.Equal(1, len(mocksender.batches["tag3"])) // One batch + assert.Equal(3, len(mocksender.batches["tag3"][0])) // with three items + assert.Equal("tag3", string(mocksender.batches["tag3"][0][0])) + assert.Equal("tag3", string(mocksender.batches["tag3"][0][1])) + assert.Equal("tag3", string(mocksender.batches["tag3"][0][2])) +} + +func TestStaggeredCheckpionting(t *testing.T) { + assert := assert.New(t) + + mocklog := logger.New("testing") + mockconfig := withDefaults(Config{ + BatchCount: 2, + BatchInterval: 100 * time.Millisecond, + CheckpointFreq: 200 * time.Nanosecond, + }) + mockcheckpointer := NewMockCheckpointer("9", 5*time.Second) + mocksender := NewMsgAsTagSender() + + wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) + wrt.Initialize("test-shard", mockcheckpointer) + + err := wrt.ProcessRecords([]kcl.Record{ + kcl.Record{SequenceNumber: "1", Data: encode("tag1")}, + kcl.Record{SequenceNumber: "2", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "3", Data: encode("tag1")}, + kcl.Record{SequenceNumber: "4", Data: encode("tag3")}, + }) + assert.NoError(err) + err = wrt.ProcessRecords([]kcl.Record{ + kcl.Record{SequenceNumber: "5", Data: encode("tag1")}, + kcl.Record{SequenceNumber: "6", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "7", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "8", Data: encode("tag3")}, + kcl.Record{SequenceNumber: "9", Data: encode("tag3")}, + }) + assert.NoError(err) + + time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once + + err = wrt.Shutdown("TERMINATE") + assert.NoError(err) + + err = mockcheckpointer.wait() + assert.NoError(err) + + mocksender.Shutdown() + mockcheckpointer.Shutdown() + + // Test to make sure writer doesn't prematurely checkpoint messages + // Checkpoints 5,6,7,8 will never be submitted because the 3rd "tag1" is in a batch + // Checkpoint 9 is submitted on shutdown when everything is being flushed + assert.NotContains(mockcheckpointer.recievedSequences, "5") + assert.NotContains(mockcheckpointer.recievedSequences, "6") + assert.NotContains(mockcheckpointer.recievedSequences, "7") + assert.NotContains(mockcheckpointer.recievedSequences, "8") + + assert.Contains(mocksender.batches, "tag1") + assert.Equal(2, len(mocksender.batches["tag1"])) // One batch + assert.Equal(2, len(mocksender.batches["tag1"][0])) // with two items + assert.Equal("tag1", string(mocksender.batches["tag1"][0][0])) + assert.Equal("tag1", string(mocksender.batches["tag1"][0][1])) + assert.Equal("tag1", string(mocksender.batches["tag1"][1][0])) + + assert.Contains(mocksender.batches, "tag3") + assert.Equal(3, len(mocksender.batches["tag3"])) // One batch + assert.Equal(2, len(mocksender.batches["tag3"][0])) // with three items + assert.Equal("tag3", string(mocksender.batches["tag3"][0][0])) + assert.Equal("tag3", string(mocksender.batches["tag3"][0][1])) + assert.Equal("tag3", string(mocksender.batches["tag3"][1][0])) + assert.Equal("tag3", string(mocksender.batches["tag3"][1][1])) + assert.Equal("tag3", string(mocksender.batches["tag3"][2][0])) + assert.Equal("tag3", string(mocksender.batches["tag3"][2][1])) +}