diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index 8ff4c45..f526e27 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -20,12 +20,12 @@ type checkpointManager struct { } func newCheckpointManager( - checkpointer kcl.Checkpointer, config Config, log kv.KayveeLogger, + checkpointer kcl.Checkpointer, checkpointFreq time.Duration, log kv.KayveeLogger, ) *checkpointManager { cm := &checkpointManager{ log: log, - checkpointFreq: config.CheckpointFreq, + checkpointFreq: checkpointFreq, checkpoint: make(chan kcl.SequencePair), shutdown: make(chan chan<- struct{}), diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index a4e2d7d..e230e00 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -43,7 +43,7 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID - b.chkpntManager = newCheckpointManager(checkpointer, b.config, b.log) + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, b.config, b.log) return nil