diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index b8b26f6..620d222 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -46,7 +46,8 @@ func newBatcherManager( batchMsg: make(chan tagMsgPair), lastIgnored: make(chan kcl.SequencePair), lastProcessed: make(chan kcl.SequencePair), - shutdown: make(chan chan<- struct{}), + // shutdown chan takes "done" channel to signal when batchermanager is done shutting down + shutdown: make(chan chan<- struct{}), } bm.startMessageHandler(bm.batchMsg, bm.lastIgnored, bm.lastProcessed, bm.shutdown) diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index a76fd0c..8ff4c45 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -15,7 +15,8 @@ type checkpointManager struct { checkpointFreq time.Duration checkpoint chan kcl.SequencePair - shutdown chan chan<- struct{} + // shutdown chan takes "done" channel to signal when checkpointManager is done shutting down + shutdown chan chan<- struct{} } func newCheckpointManager(