From b7743c9ea7f1312064a8c24da560ff8f5c447273 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Tue, 8 Aug 2017 19:09:31 +0000 Subject: [PATCH] Added a more coherent shutdown pathway --- batchconsumer/batchermanager.go | 21 ++++++++++++++------- batchconsumer/checkpointmanager.go | 27 +++++++++++++++------------ batchconsumer/writer.go | 3 ++- kcl/kcl.go | 26 ++++++++++++++------------ 4 files changed, 45 insertions(+), 32 deletions(-) diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 22ca8cd..8b469c3 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -28,7 +28,7 @@ type batcherManager struct { batchMsg chan tagMsgPair lastIgnored chan kcl.SequencePair lastProcessed chan kcl.SequencePair - shutdown chan struct{} + shutdown chan chan<- struct{} } func NewBatcherManager( @@ -46,7 +46,7 @@ func NewBatcherManager( batchMsg: make(chan tagMsgPair), lastIgnored: make(chan kcl.SequencePair), lastProcessed: make(chan kcl.SequencePair), - shutdown: make(chan struct{}), + shutdown: make(chan chan<- struct{}), } bm.startMessageHandler(bm.batchMsg, bm.lastIgnored, bm.lastProcessed, bm.shutdown) @@ -66,8 +66,11 @@ func (b *batcherManager) LatestProcessed(pair kcl.SequencePair) { b.lastProcessed <- pair } -func (b *batcherManager) Shutdown() { - b.shutdown <- struct{}{} +func (b *batcherManager) Shutdown() <-chan struct{} { + done := make(chan struct{}) + b.shutdown <- done + + return done } func (b *batcherManager) createBatcher() *batcher { @@ -132,7 +135,7 @@ func (b *batcherManager) sendCheckpoint( // go routine to avoid racey conditions. func (b *batcherManager) startMessageHandler( batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair, - shutdown <-chan struct{}, + shutdown <-chan chan<- struct{}, ) { go func() { var lastProcessedPair kcl.SequencePair @@ -189,12 +192,16 @@ func (b *batcherManager) startMessageHandler( } case pair := <-lastProcessed: lastProcessedPair = pair - case <-shutdown: + case done := <-shutdown: for tag, batcher := range batchers { b.sendBatch(batcher, tag) } b.chkpntManager.Checkpoint(lastProcessedPair) - b.chkpntManager.Shutdown() + chkDone := b.chkpntManager.Shutdown() + <-chkDone + + done <- struct{}{} + return } } }() diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index ec8a9dd..752fe85 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -15,7 +15,7 @@ type checkpointManager struct { checkpointFreq time.Duration checkpoint chan kcl.SequencePair - shutdown chan struct{} + shutdown chan chan<- struct{} } func NewCheckpointManager( @@ -27,7 +27,7 @@ func NewCheckpointManager( checkpointFreq: config.CheckpointFreq, checkpoint: make(chan kcl.SequencePair), - shutdown: make(chan struct{}), + shutdown: make(chan chan<- struct{}), } cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown) @@ -39,33 +39,35 @@ func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) { cm.checkpoint <- pair } -func (cm *checkpointManager) Shutdown() { - cm.shutdown <- struct{}{} +func (cm *checkpointManager) Shutdown() <-chan struct{} { + done := make(chan struct{}) + cm.shutdown <- done + + return done } func (cm *checkpointManager) startCheckpointHandler( - checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair, shutdown <-chan struct{}, + checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair, + shutdown <-chan chan<- struct{}, ) { go func() { lastCheckpoint := time.Now() for { + var doneShutdown chan<- struct{} pair := kcl.SequencePair{} - isShuttingDown := false select { case pair = <-checkpoint: - case <-shutdown: - isShuttingDown = true + case doneShutdown = <-shutdown: } // This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq. // The latest pair number is always used. - for !isShuttingDown && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq { + for doneShutdown == nil && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq { select { case pair = <-checkpoint: // Keep updating checkpoint pair while waiting - case <-shutdown: - isShuttingDown = true + case doneShutdown = <-shutdown: case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C: } } @@ -76,8 +78,9 @@ func (cm *checkpointManager) startCheckpointHandler( stats.Counter("checkpoints-sent", 1) } - if isShuttingDown { + if doneShutdown != nil { checkpointer.Shutdown() + doneShutdown <- struct{}{} return } } diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 8035b9f..b0fd89e 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -143,7 +143,8 @@ func (b *batchedWriter) Shutdown(reason string) error { b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } - b.batcherManager.Shutdown() + done := b.batcherManager.Shutdown() + <-done return nil } diff --git a/kcl/kcl.go b/kcl/kcl.go index 399a7c2..3ff33bc 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -13,6 +13,7 @@ import ( type RecordProcessor interface { Initialize(shardID string, checkpointer Checkpointer) error ProcessRecords(records []Record) error + // Shutdown this call should block until it's safe to shutdown the process Shutdown(reason string) error } @@ -138,7 +139,6 @@ func New( ioHandler: i, recordProcessor: recordProcessor, - isShuttingDown: false, nextCheckpointPair: SequencePair{}, } } @@ -149,7 +149,6 @@ type KCLProcess struct { ioHandler ioHandler recordProcessor RecordProcessor - isShuttingDown bool nextCheckpointPair SequencePair } @@ -163,7 +162,8 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair) { } func (kclp *KCLProcess) Shutdown() { - kclp.isShuttingDown = true + kclp.ioHandler.writeError("Checkpoint shutdown") + kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown } func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { @@ -230,6 +230,17 @@ func (kclp *KCLProcess) handleLine(line string) error { switch action := action.(type) { case ActionCheckpoint: err = kclp.handleCheckpointAction(action) + case ActionShutdown: + kclp.ioHandler.writeError("Received shutdown action...") + + // Shutdown should block until it's save to shutdown the process + err = kclp.recordProcessor.Shutdown(action.Reason) + if err != nil { // Log error and continue shutting down + kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err)) + } + + kclp.ioHandler.writeError("Reporting shutdown done") + return kclp.reportDone("shutdown") case ActionInitialize: err = kclp.recordProcessor.Initialize(action.ShardID, kclp) if err == nil { @@ -240,11 +251,6 @@ func (kclp *KCLProcess) handleLine(line string) error { if err == nil { err = kclp.reportDone(action.Action) } - case ActionShutdown: - err = kclp.recordProcessor.Shutdown(action.Reason) - if err == nil { - err = kclp.reportDone(action.Action) - } default: err = fmt.Errorf("unknown action to dispatch: %+#v", action) } @@ -285,9 +291,5 @@ func (kclp *KCLProcess) Run() { } } kclp.ckpmux.Unlock() - - if kclp.isShuttingDown { - kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown - } } }