diff --git a/batchconsumer/sender.go b/batchconsumer/sender.go index 03e0421..ed90ad6 100644 --- a/batchconsumer/sender.go +++ b/batchconsumer/sender.go @@ -11,6 +11,8 @@ var ErrMessageIgnored = errors.New("Message intentionally skipped by sender") // Sender an interface needed for batch consumer implementations type Sender interface { + // Initialize called once before ProcessMessage and SendBatch + Initialize(shardID string) // ProcessMessage receives a raw message and is expected to return an appropriately formatted // message as well as a list of tags for that log line. A tag corresponds to a batch that // it'll be put into. Typically tags are series names. diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index cf5eab4..4fe8c9b 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -49,6 +49,8 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer BatchInterval: b.config.BatchInterval, } + b.sender.Initialize(shardID) + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq) b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile) diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 7bc7e33..8de24a9 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -14,6 +14,8 @@ import ( type ignoringSender struct{} +func (i ignoringSender) Initialize(shardID string) {} + func (i ignoringSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) { return nil, nil, ErrMessageIgnored } @@ -27,6 +29,7 @@ type tagBatch struct { batch [][]byte } type msgAsTagSender struct { + shardID string batches map[string][][][]byte saveBatch chan tagBatch shutdown chan struct{} @@ -61,6 +64,10 @@ func (i *msgAsTagSender) startBatchSaver(saveBatch <-chan tagBatch, shutdown <-c }() } +func (i *msgAsTagSender) Initialize(shardID string) { + i.shardID = shardID +} + func (i *msgAsTagSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) { if "ignore" == string(rawmsg) { return nil, nil, ErrMessageIgnored @@ -176,6 +183,8 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) { wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile) wrt.Initialize("test-shard", mockcheckpointer) + assert.Equal("test-shard", mocksender.shardID) + err := wrt.ProcessRecords([]kcl.Record{ kcl.Record{SequenceNumber: "1", Data: encode("tag1")}, kcl.Record{SequenceNumber: "2", Data: encode("tag1")}, diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index 004ec36..ff7c1d4 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -23,7 +23,12 @@ func main() { } type exampleSender struct { - output logger.KayveeLogger + shardID string + output logger.KayveeLogger +} + +func (e *exampleSender) Initialize(shardID string) { + e.shardID = shardID } func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) {