diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 620d222..7c0d3ad 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -16,6 +16,12 @@ type tagMsgPair struct { pair kcl.SequencePair } +type batcherManagerConfig struct { + BatchCount int + BatchSize int + BatchInterval time.Duration +} + type batcherManager struct { log kv.KayveeLogger sender Sender @@ -32,16 +38,16 @@ type batcherManager struct { } func newBatcherManager( - sender Sender, chkpntManager *checkpointManager, config Config, log kv.KayveeLogger, + sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger, ) *batcherManager { bm := &batcherManager{ log: log, sender: sender, chkpntManager: chkpntManager, - batchCount: config.BatchCount, - batchSize: config.BatchSize, - batchInterval: config.BatchInterval, + batchCount: cfg.BatchCount, + batchSize: cfg.BatchSize, + batchInterval: cfg.BatchInterval, batchMsg: make(chan tagMsgPair), lastIgnored: make(chan kcl.SequencePair), diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index e230e00..095dd73 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -43,8 +43,14 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID + bmConfig := batcherManagerConfig{ + BatchCount: b.config.BatchCount, + BatchSize: b.config.BatchSize, + BatchInterval: b.config.BatchInterval, + } + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) - b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, b.config, b.log) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log) return nil }