amazon-kinesis-client-go/batchconsumer/checkpointmanager.go

89 lines
2 KiB
Go
Raw Normal View History

package batchconsumer
import (
"time"
kv "gopkg.in/Clever/kayvee-go.v6/logger"
2017-08-07 03:05:41 +00:00
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
"github.com/Clever/amazon-kinesis-client-go/kcl"
)
type checkpointManager struct {
log kv.KayveeLogger
checkpointFreq time.Duration
checkpoint chan kcl.SequencePair
2017-08-08 19:09:31 +00:00
shutdown chan chan<- struct{}
}
func NewCheckpointManager(
checkpointer kcl.Checkpointer, config Config, log kv.KayveeLogger,
) *checkpointManager {
cm := &checkpointManager{
log: log,
checkpointFreq: config.CheckpointFreq,
checkpoint: make(chan kcl.SequencePair),
2017-08-08 19:09:31 +00:00
shutdown: make(chan chan<- struct{}),
}
cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown)
return cm
}
func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) {
cm.checkpoint <- pair
}
2017-08-08 19:09:31 +00:00
func (cm *checkpointManager) Shutdown() <-chan struct{} {
done := make(chan struct{})
cm.shutdown <- done
return done
}
func (cm *checkpointManager) startCheckpointHandler(
2017-08-08 19:09:31 +00:00
checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair,
shutdown <-chan chan<- struct{},
) {
go func() {
lastCheckpoint := time.Now()
for {
2017-08-08 19:09:31 +00:00
var doneShutdown chan<- struct{}
pair := kcl.SequencePair{}
select {
case pair = <-checkpoint:
2017-08-08 19:09:31 +00:00
case doneShutdown = <-shutdown:
}
// This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq.
// The latest pair number is always used.
2017-08-08 19:09:31 +00:00
for doneShutdown == nil && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq {
select {
case pair = <-checkpoint: // Keep updating checkpoint pair while waiting
2017-08-08 19:09:31 +00:00
case doneShutdown = <-shutdown:
case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C:
}
}
if !pair.IsEmpty() {
checkpointer.Checkpoint(pair)
lastCheckpoint = time.Now()
2017-08-07 03:05:41 +00:00
stats.Counter("checkpoints-sent", 1)
}
2017-08-08 19:09:31 +00:00
if doneShutdown != nil {
checkpointer.Shutdown()
2017-08-08 19:09:31 +00:00
doneShutdown <- struct{}{}
return
}
}
}()
}