Better propogated shutdown signal

This commit is contained in:
Xavi Ramirez 2017-08-03 07:55:57 +00:00
parent 873544ae78
commit 6e9457cbcf

View file

@ -29,11 +29,12 @@ type batchedWriter struct {
shardID string
checkpointMsg chan kcl.SequencePair
checkpointTag chan string
lastProcessedPair chan kcl.SequencePair
batchMsg chan tagMsgPair
flushBatches chan struct{}
checkpointMsg chan kcl.SequencePair
checkpointShutdown chan struct{}
checkpointTag chan string
lastProcessedPair chan kcl.SequencePair
batchMsg chan tagMsgPair
shutdown chan struct{}
// Limits the number of records read from the stream
rateLimiter *rate.Limiter
@ -54,13 +55,14 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche
func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
b.shardID = shardID
b.checkpointMsg = make(chan kcl.SequencePair)
b.startCheckpointListener(checkpointer, b.checkpointMsg)
b.checkpointShutdown = make(chan struct{})
b.startCheckpointListener(checkpointer, b.checkpointMsg, b.checkpointShutdown)
b.checkpointTag = make(chan string)
b.batchMsg = make(chan tagMsgPair)
b.flushBatches = make(chan struct{})
b.shutdown = make(chan struct{})
b.lastProcessedPair = make(chan kcl.SequencePair)
b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches)
b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.shutdown)
return nil
}
@ -96,26 +98,35 @@ func (b *batchedWriter) handleCheckpointError(err error) bool {
func (b *batchedWriter) startCheckpointListener(
checkpointer kcl.Checkpointer, checkpointMsg <-chan kcl.SequencePair,
shutdown <-chan struct{},
) {
go func() {
lastCheckpoint := time.Now()
for {
seq := <-checkpointMsg
seq := kcl.SequencePair{}
isShuttingDown := false
select {
case seq = <-checkpointMsg:
case <-shutdown:
isShuttingDown = true
}
// This is a write throttle to ensure we don't checkpoint faster than
// b.config.CheckpointFreq. The latest seq number is always used.
for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq {
for !isShuttingDown && time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq {
select {
case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting
case <-shutdown:
isShuttingDown = true
case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C:
}
}
retry := true
for n := 0; retry && n < b.config.CheckpointRetries+1; n++ {
str := seq.Sequence.String()
err := checkpointer.Checkpoint(&str, &seq.SubSequence)
for n := 0; retry && !seq.IsEmpty() && n < b.config.CheckpointRetries+1; n++ {
err := checkpointer.Checkpoint(seq, 5)
if err == nil { // Successfully checkpointed!
lastCheckpoint = time.Now()
break
@ -132,6 +143,11 @@ func (b *batchedWriter) startCheckpointListener(
time.Sleep(b.config.CheckpointRetrySleep)
}
}
if isShuttingDown {
checkpointer.Shutdown()
return
}
}
}()
}
@ -153,7 +169,7 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher {
// go routine to avoid racey conditions.
func (b *batchedWriter) startMessageHandler(
batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan kcl.SequencePair,
flushBatches <-chan struct{},
shutdown <-chan struct{},
) {
go func() {
var lastProcessedPair kcl.SequencePair
@ -206,11 +222,12 @@ func (b *batchedWriter) startMessageHandler(
b.checkpointMsg <- pair
}
lastProcessedPair = pair
case <-flushBatches:
case <-shutdown:
for _, batch := range batchers {
batch.Flush()
}
b.checkpointMsg <- lastProcessedPair
b.checkpointShutdown <- struct{}{}
areBatchersEmpty = true
}
}
@ -317,9 +334,9 @@ func (b *batchedWriter) SendBatch(batch [][]byte, tag string) {
func (b *batchedWriter) Shutdown(reason string) error {
if reason == "TERMINATE" {
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
b.flushBatches <- struct{}{}
} else {
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
}
b.shutdown <- struct{}{}
return nil
}