2017-08-04 09:36:42 +00:00
|
|
|
package batchconsumer
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
kv "gopkg.in/Clever/kayvee-go.v6/logger"
|
|
|
|
|
|
2017-08-07 03:05:41 +00:00
|
|
|
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
|
2017-08-04 09:36:42 +00:00
|
|
|
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type checkpointManager struct {
|
|
|
|
|
log kv.KayveeLogger
|
|
|
|
|
|
2017-08-06 05:02:55 +00:00
|
|
|
checkpointFreq time.Duration
|
2017-08-04 09:36:42 +00:00
|
|
|
|
|
|
|
|
checkpoint chan kcl.SequencePair
|
2017-08-10 19:56:17 +00:00
|
|
|
// shutdown chan takes "done" channel to signal when checkpointManager is done shutting down
|
|
|
|
|
shutdown chan chan<- struct{}
|
2017-08-04 09:36:42 +00:00
|
|
|
}
|
|
|
|
|
|
2017-08-10 19:21:46 +00:00
|
|
|
func newCheckpointManager(
|
2017-08-04 09:36:42 +00:00
|
|
|
checkpointer kcl.Checkpointer, config Config, log kv.KayveeLogger,
|
|
|
|
|
) *checkpointManager {
|
|
|
|
|
cm := &checkpointManager{
|
|
|
|
|
log: log,
|
|
|
|
|
|
2017-08-06 05:02:55 +00:00
|
|
|
checkpointFreq: config.CheckpointFreq,
|
2017-08-04 09:36:42 +00:00
|
|
|
|
|
|
|
|
checkpoint: make(chan kcl.SequencePair),
|
2017-08-08 19:09:31 +00:00
|
|
|
shutdown: make(chan chan<- struct{}),
|
2017-08-04 09:36:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown)
|
|
|
|
|
|
|
|
|
|
return cm
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) {
|
|
|
|
|
cm.checkpoint <- pair
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-08 19:09:31 +00:00
|
|
|
func (cm *checkpointManager) Shutdown() <-chan struct{} {
|
|
|
|
|
done := make(chan struct{})
|
|
|
|
|
cm.shutdown <- done
|
|
|
|
|
|
|
|
|
|
return done
|
2017-08-04 09:36:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (cm *checkpointManager) startCheckpointHandler(
|
2017-08-08 19:09:31 +00:00
|
|
|
checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair,
|
|
|
|
|
shutdown <-chan chan<- struct{},
|
2017-08-04 09:36:42 +00:00
|
|
|
) {
|
|
|
|
|
go func() {
|
|
|
|
|
lastCheckpoint := time.Now()
|
|
|
|
|
|
|
|
|
|
for {
|
2017-08-08 19:09:31 +00:00
|
|
|
var doneShutdown chan<- struct{}
|
2017-08-04 09:36:42 +00:00
|
|
|
pair := kcl.SequencePair{}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case pair = <-checkpoint:
|
2017-08-08 19:09:31 +00:00
|
|
|
case doneShutdown = <-shutdown:
|
2017-08-04 09:36:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq.
|
|
|
|
|
// The latest pair number is always used.
|
2017-08-08 19:09:31 +00:00
|
|
|
for doneShutdown == nil && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq {
|
2017-08-04 09:36:42 +00:00
|
|
|
select {
|
|
|
|
|
case pair = <-checkpoint: // Keep updating checkpoint pair while waiting
|
2017-08-08 19:09:31 +00:00
|
|
|
case doneShutdown = <-shutdown:
|
2017-08-04 09:36:42 +00:00
|
|
|
case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !pair.IsEmpty() {
|
2017-08-06 05:02:55 +00:00
|
|
|
checkpointer.Checkpoint(pair)
|
|
|
|
|
lastCheckpoint = time.Now()
|
2017-08-07 03:05:41 +00:00
|
|
|
stats.Counter("checkpoints-sent", 1)
|
2017-08-04 09:36:42 +00:00
|
|
|
}
|
|
|
|
|
|
2017-08-08 19:09:31 +00:00
|
|
|
if doneShutdown != nil {
|
2017-08-04 09:36:42 +00:00
|
|
|
checkpointer.Shutdown()
|
2017-08-08 19:09:31 +00:00
|
|
|
doneShutdown <- struct{}{}
|
2017-08-04 09:36:42 +00:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|