Added a more coherent shutdown pathway
This commit is contained in:
parent
1be812a887
commit
b7743c9ea7
4 changed files with 45 additions and 32 deletions
|
|
@ -28,7 +28,7 @@ type batcherManager struct {
|
|||
batchMsg chan tagMsgPair
|
||||
lastIgnored chan kcl.SequencePair
|
||||
lastProcessed chan kcl.SequencePair
|
||||
shutdown chan struct{}
|
||||
shutdown chan chan<- struct{}
|
||||
}
|
||||
|
||||
func NewBatcherManager(
|
||||
|
|
@ -46,7 +46,7 @@ func NewBatcherManager(
|
|||
batchMsg: make(chan tagMsgPair),
|
||||
lastIgnored: make(chan kcl.SequencePair),
|
||||
lastProcessed: make(chan kcl.SequencePair),
|
||||
shutdown: make(chan struct{}),
|
||||
shutdown: make(chan chan<- struct{}),
|
||||
}
|
||||
|
||||
bm.startMessageHandler(bm.batchMsg, bm.lastIgnored, bm.lastProcessed, bm.shutdown)
|
||||
|
|
@ -66,8 +66,11 @@ func (b *batcherManager) LatestProcessed(pair kcl.SequencePair) {
|
|||
b.lastProcessed <- pair
|
||||
}
|
||||
|
||||
func (b *batcherManager) Shutdown() {
|
||||
b.shutdown <- struct{}{}
|
||||
func (b *batcherManager) Shutdown() <-chan struct{} {
|
||||
done := make(chan struct{})
|
||||
b.shutdown <- done
|
||||
|
||||
return done
|
||||
}
|
||||
|
||||
func (b *batcherManager) createBatcher() *batcher {
|
||||
|
|
@ -132,7 +135,7 @@ func (b *batcherManager) sendCheckpoint(
|
|||
// go routine to avoid racey conditions.
|
||||
func (b *batcherManager) startMessageHandler(
|
||||
batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair,
|
||||
shutdown <-chan struct{},
|
||||
shutdown <-chan chan<- struct{},
|
||||
) {
|
||||
go func() {
|
||||
var lastProcessedPair kcl.SequencePair
|
||||
|
|
@ -189,12 +192,16 @@ func (b *batcherManager) startMessageHandler(
|
|||
}
|
||||
case pair := <-lastProcessed:
|
||||
lastProcessedPair = pair
|
||||
case <-shutdown:
|
||||
case done := <-shutdown:
|
||||
for tag, batcher := range batchers {
|
||||
b.sendBatch(batcher, tag)
|
||||
}
|
||||
b.chkpntManager.Checkpoint(lastProcessedPair)
|
||||
b.chkpntManager.Shutdown()
|
||||
chkDone := b.chkpntManager.Shutdown()
|
||||
<-chkDone
|
||||
|
||||
done <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ type checkpointManager struct {
|
|||
checkpointFreq time.Duration
|
||||
|
||||
checkpoint chan kcl.SequencePair
|
||||
shutdown chan struct{}
|
||||
shutdown chan chan<- struct{}
|
||||
}
|
||||
|
||||
func NewCheckpointManager(
|
||||
|
|
@ -27,7 +27,7 @@ func NewCheckpointManager(
|
|||
checkpointFreq: config.CheckpointFreq,
|
||||
|
||||
checkpoint: make(chan kcl.SequencePair),
|
||||
shutdown: make(chan struct{}),
|
||||
shutdown: make(chan chan<- struct{}),
|
||||
}
|
||||
|
||||
cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown)
|
||||
|
|
@ -39,33 +39,35 @@ func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) {
|
|||
cm.checkpoint <- pair
|
||||
}
|
||||
|
||||
func (cm *checkpointManager) Shutdown() {
|
||||
cm.shutdown <- struct{}{}
|
||||
func (cm *checkpointManager) Shutdown() <-chan struct{} {
|
||||
done := make(chan struct{})
|
||||
cm.shutdown <- done
|
||||
|
||||
return done
|
||||
}
|
||||
|
||||
func (cm *checkpointManager) startCheckpointHandler(
|
||||
checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair, shutdown <-chan struct{},
|
||||
checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair,
|
||||
shutdown <-chan chan<- struct{},
|
||||
) {
|
||||
go func() {
|
||||
lastCheckpoint := time.Now()
|
||||
|
||||
for {
|
||||
var doneShutdown chan<- struct{}
|
||||
pair := kcl.SequencePair{}
|
||||
isShuttingDown := false
|
||||
|
||||
select {
|
||||
case pair = <-checkpoint:
|
||||
case <-shutdown:
|
||||
isShuttingDown = true
|
||||
case doneShutdown = <-shutdown:
|
||||
}
|
||||
|
||||
// This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq.
|
||||
// The latest pair number is always used.
|
||||
for !isShuttingDown && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq {
|
||||
for doneShutdown == nil && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq {
|
||||
select {
|
||||
case pair = <-checkpoint: // Keep updating checkpoint pair while waiting
|
||||
case <-shutdown:
|
||||
isShuttingDown = true
|
||||
case doneShutdown = <-shutdown:
|
||||
case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C:
|
||||
}
|
||||
}
|
||||
|
|
@ -76,8 +78,9 @@ func (cm *checkpointManager) startCheckpointHandler(
|
|||
stats.Counter("checkpoints-sent", 1)
|
||||
}
|
||||
|
||||
if isShuttingDown {
|
||||
if doneShutdown != nil {
|
||||
checkpointer.Shutdown()
|
||||
doneShutdown <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -143,7 +143,8 @@ func (b *batchedWriter) Shutdown(reason string) error {
|
|||
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
|
||||
}
|
||||
|
||||
b.batcherManager.Shutdown()
|
||||
done := b.batcherManager.Shutdown()
|
||||
<-done
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
26
kcl/kcl.go
26
kcl/kcl.go
|
|
@ -13,6 +13,7 @@ import (
|
|||
type RecordProcessor interface {
|
||||
Initialize(shardID string, checkpointer Checkpointer) error
|
||||
ProcessRecords(records []Record) error
|
||||
// Shutdown this call should block until it's safe to shutdown the process
|
||||
Shutdown(reason string) error
|
||||
}
|
||||
|
||||
|
|
@ -138,7 +139,6 @@ func New(
|
|||
ioHandler: i,
|
||||
recordProcessor: recordProcessor,
|
||||
|
||||
isShuttingDown: false,
|
||||
nextCheckpointPair: SequencePair{},
|
||||
}
|
||||
}
|
||||
|
|
@ -149,7 +149,6 @@ type KCLProcess struct {
|
|||
ioHandler ioHandler
|
||||
recordProcessor RecordProcessor
|
||||
|
||||
isShuttingDown bool
|
||||
nextCheckpointPair SequencePair
|
||||
}
|
||||
|
||||
|
|
@ -163,7 +162,8 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair) {
|
|||
}
|
||||
|
||||
func (kclp *KCLProcess) Shutdown() {
|
||||
kclp.isShuttingDown = true
|
||||
kclp.ioHandler.writeError("Checkpoint shutdown")
|
||||
kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown
|
||||
}
|
||||
|
||||
func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
|
||||
|
|
@ -230,6 +230,17 @@ func (kclp *KCLProcess) handleLine(line string) error {
|
|||
switch action := action.(type) {
|
||||
case ActionCheckpoint:
|
||||
err = kclp.handleCheckpointAction(action)
|
||||
case ActionShutdown:
|
||||
kclp.ioHandler.writeError("Received shutdown action...")
|
||||
|
||||
// Shutdown should block until it's save to shutdown the process
|
||||
err = kclp.recordProcessor.Shutdown(action.Reason)
|
||||
if err != nil { // Log error and continue shutting down
|
||||
kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err))
|
||||
}
|
||||
|
||||
kclp.ioHandler.writeError("Reporting shutdown done")
|
||||
return kclp.reportDone("shutdown")
|
||||
case ActionInitialize:
|
||||
err = kclp.recordProcessor.Initialize(action.ShardID, kclp)
|
||||
if err == nil {
|
||||
|
|
@ -240,11 +251,6 @@ func (kclp *KCLProcess) handleLine(line string) error {
|
|||
if err == nil {
|
||||
err = kclp.reportDone(action.Action)
|
||||
}
|
||||
case ActionShutdown:
|
||||
err = kclp.recordProcessor.Shutdown(action.Reason)
|
||||
if err == nil {
|
||||
err = kclp.reportDone(action.Action)
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("unknown action to dispatch: %+#v", action)
|
||||
}
|
||||
|
|
@ -285,9 +291,5 @@ func (kclp *KCLProcess) Run() {
|
|||
}
|
||||
}
|
||||
kclp.ckpmux.Unlock()
|
||||
|
||||
if kclp.isShuttingDown {
|
||||
kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue