amazon-kinesis-client-go/batchconsumer/checkpointmanager.go
2017-11-03 17:48:50 +00:00

81 lines
1.9 KiB
Go

package batchconsumer
import (
"time"
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
"github.com/Clever/amazon-kinesis-client-go/kcl"
)
type checkpointManager struct {
checkpointFreq time.Duration
checkpoint chan kcl.SequencePair
// shutdown chan takes "done" channel to signal when checkpointManager is done shutting down
shutdown chan chan<- struct{}
}
func newCheckpointManager(checkpointer kcl.Checkpointer, checkpointFreq time.Duration) *checkpointManager {
cm := &checkpointManager{
checkpointFreq: checkpointFreq,
checkpoint: make(chan kcl.SequencePair),
shutdown: make(chan chan<- struct{}),
}
cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown)
return cm
}
func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) {
cm.checkpoint <- pair
}
func (cm *checkpointManager) Shutdown() <-chan struct{} {
done := make(chan struct{})
cm.shutdown <- done
return done
}
func (cm *checkpointManager) startCheckpointHandler(
checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair,
shutdown <-chan chan<- struct{},
) {
go func() {
lastCheckpoint := time.Now()
for {
var doneShutdown chan<- struct{}
pair := kcl.SequencePair{}
select {
case pair = <-checkpoint:
case doneShutdown = <-shutdown:
}
// This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq.
// The latest pair number is always used.
for doneShutdown == nil && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq {
select {
case pair = <-checkpoint: // Keep updating checkpoint pair while waiting
case doneShutdown = <-shutdown:
case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C:
}
}
if !pair.IsNil() {
checkpointer.Checkpoint(pair)
lastCheckpoint = time.Now()
stats.Counter("checkpoints-sent", 1)
}
if doneShutdown != nil {
checkpointer.Shutdown()
doneShutdown <- struct{}{}
return
}
}
}()
}