Added Initialize method to sender
This commit is contained in:
parent
df698f34aa
commit
e5850f1464
4 changed files with 19 additions and 1 deletions
|
|
@ -11,6 +11,8 @@ var ErrMessageIgnored = errors.New("Message intentionally skipped by sender")
|
||||||
|
|
||||||
// Sender an interface needed for batch consumer implementations
|
// Sender an interface needed for batch consumer implementations
|
||||||
type Sender interface {
|
type Sender interface {
|
||||||
|
// Initialize called once before ProcessMessage and SendBatch
|
||||||
|
Initialize(shardID string)
|
||||||
// ProcessMessage receives a raw message and is expected to return an appropriately formatted
|
// ProcessMessage receives a raw message and is expected to return an appropriately formatted
|
||||||
// message as well as a list of tags for that log line. A tag corresponds to a batch that
|
// message as well as a list of tags for that log line. A tag corresponds to a batch that
|
||||||
// it'll be put into. Typically tags are series names.
|
// it'll be put into. Typically tags are series names.
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,8 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer
|
||||||
BatchInterval: b.config.BatchInterval,
|
BatchInterval: b.config.BatchInterval,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.sender.Initialize(shardID)
|
||||||
|
|
||||||
b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq)
|
b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq)
|
||||||
b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile)
|
b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.failedLogsFile)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,8 @@ import (
|
||||||
|
|
||||||
type ignoringSender struct{}
|
type ignoringSender struct{}
|
||||||
|
|
||||||
|
func (i ignoringSender) Initialize(shardID string) {}
|
||||||
|
|
||||||
func (i ignoringSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
|
func (i ignoringSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
|
||||||
return nil, nil, ErrMessageIgnored
|
return nil, nil, ErrMessageIgnored
|
||||||
}
|
}
|
||||||
|
|
@ -27,6 +29,7 @@ type tagBatch struct {
|
||||||
batch [][]byte
|
batch [][]byte
|
||||||
}
|
}
|
||||||
type msgAsTagSender struct {
|
type msgAsTagSender struct {
|
||||||
|
shardID string
|
||||||
batches map[string][][][]byte
|
batches map[string][][][]byte
|
||||||
saveBatch chan tagBatch
|
saveBatch chan tagBatch
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
|
|
@ -61,6 +64,10 @@ func (i *msgAsTagSender) startBatchSaver(saveBatch <-chan tagBatch, shutdown <-c
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *msgAsTagSender) Initialize(shardID string) {
|
||||||
|
i.shardID = shardID
|
||||||
|
}
|
||||||
|
|
||||||
func (i *msgAsTagSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
|
func (i *msgAsTagSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
|
||||||
if "ignore" == string(rawmsg) {
|
if "ignore" == string(rawmsg) {
|
||||||
return nil, nil, ErrMessageIgnored
|
return nil, nil, ErrMessageIgnored
|
||||||
|
|
@ -176,6 +183,8 @@ func TestProcessRecordsSingleBatchBasic(t *testing.T) {
|
||||||
wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile)
|
wrt := NewBatchedWriter(mockconfig, mocksender, mockFailedLogsFile)
|
||||||
wrt.Initialize("test-shard", mockcheckpointer)
|
wrt.Initialize("test-shard", mockcheckpointer)
|
||||||
|
|
||||||
|
assert.Equal("test-shard", mocksender.shardID)
|
||||||
|
|
||||||
err := wrt.ProcessRecords([]kcl.Record{
|
err := wrt.ProcessRecords([]kcl.Record{
|
||||||
kcl.Record{SequenceNumber: "1", Data: encode("tag1")},
|
kcl.Record{SequenceNumber: "1", Data: encode("tag1")},
|
||||||
kcl.Record{SequenceNumber: "2", Data: encode("tag1")},
|
kcl.Record{SequenceNumber: "2", Data: encode("tag1")},
|
||||||
|
|
|
||||||
|
|
@ -23,9 +23,14 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
type exampleSender struct {
|
type exampleSender struct {
|
||||||
|
shardID string
|
||||||
output logger.KayveeLogger
|
output logger.KayveeLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *exampleSender) Initialize(shardID string) {
|
||||||
|
e.shardID = shardID
|
||||||
|
}
|
||||||
|
|
||||||
func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) {
|
func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) {
|
||||||
if len(rawmsg)%5 == 2 {
|
if len(rawmsg)%5 == 2 {
|
||||||
return nil, nil, kbc.ErrMessageIgnored
|
return nil, nil, kbc.ErrMessageIgnored
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue