From 45fad863d0e356eb5987493d15bc78937e427626 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 10 Aug 2017 20:11:24 +0000 Subject: [PATCH] Made config object that's specific to BatcherManager --- batchconsumer/batchermanager.go | 14 ++++++++++---- batchconsumer/writer.go | 8 +++++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 620d222..7c0d3ad 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -16,6 +16,12 @@ type tagMsgPair struct { pair kcl.SequencePair } +type batcherManagerConfig struct { + BatchCount int + BatchSize int + BatchInterval time.Duration +} + type batcherManager struct { log kv.KayveeLogger sender Sender @@ -32,16 +38,16 @@ type batcherManager struct { } func newBatcherManager( - sender Sender, chkpntManager *checkpointManager, config Config, log kv.KayveeLogger, + sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger, ) *batcherManager { bm := &batcherManager{ log: log, sender: sender, chkpntManager: chkpntManager, - batchCount: config.BatchCount, - batchSize: config.BatchSize, - batchInterval: config.BatchInterval, + batchCount: cfg.BatchCount, + batchSize: cfg.BatchSize, + batchInterval: cfg.BatchInterval, batchMsg: make(chan tagMsgPair), lastIgnored: make(chan kcl.SequencePair), diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index e230e00..095dd73 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -43,8 +43,14 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID + bmConfig := batcherManagerConfig{ + BatchCount: b.config.BatchCount, + BatchSize: b.config.BatchSize, + BatchInterval: b.config.BatchInterval, + } + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) - b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, b.config, b.log) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log) return nil }