Better coordinated checkpoints and readlines. Checkpoint function now returns all errors.

This commit is contained in:
Xavi Ramirez 2017-08-06 05:02:55 +00:00
parent b19c9297d8
commit 734b46274f
3 changed files with 50 additions and 45 deletions

View file

@ -11,8 +11,7 @@ import (
type checkpointManager struct { type checkpointManager struct {
log kv.KayveeLogger log kv.KayveeLogger
checkpointRetries int checkpointFreq time.Duration
checkpointFreq time.Duration
checkpoint chan kcl.SequencePair checkpoint chan kcl.SequencePair
shutdown chan struct{} shutdown chan struct{}
@ -24,8 +23,7 @@ func NewCheckpointManager(
cm := &checkpointManager{ cm := &checkpointManager{
log: log, log: log,
checkpointRetries: config.CheckpointRetries, checkpointFreq: config.CheckpointFreq,
checkpointFreq: config.CheckpointFreq,
checkpoint: make(chan kcl.SequencePair), checkpoint: make(chan kcl.SequencePair),
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
@ -72,12 +70,8 @@ func (cm *checkpointManager) startCheckpointHandler(
} }
if !pair.IsEmpty() { if !pair.IsEmpty() {
err := checkpointer.Checkpoint(pair, cm.checkpointRetries) checkpointer.Checkpoint(pair)
if err != nil { lastCheckpoint = time.Now()
cm.log.ErrorD("checkpoint-err", kv.M{"msg": err.Error()})
} else {
lastCheckpoint = time.Now()
}
} }
if isShuttingDown { if isShuttingDown {

View file

@ -33,10 +33,6 @@ type Config struct {
// CheckpointFreq the frequency in which a checkpoint is saved // CheckpointFreq the frequency in which a checkpoint is saved
CheckpointFreq time.Duration CheckpointFreq time.Duration
// CheckpointRetries the number of times the consumer will try to save a checkpoint
CheckpointRetries int
// CheckpointRetrySleep the amount of time between checkpoint save attempts
CheckpointRetrySleep time.Duration
} }
// BatchConsumer is responsible for marshalling // BatchConsumer is responsible for marshalling
@ -75,12 +71,6 @@ func withDefaults(config Config) Config {
if config.CheckpointFreq == 0 { if config.CheckpointFreq == 0 {
config.CheckpointFreq = 60 * time.Second config.CheckpointFreq = 60 * time.Second
} }
if config.CheckpointRetries == 0 {
config.CheckpointRetries = 5
}
if config.CheckpointRetrySleep == 0 {
config.CheckpointRetrySleep = 5 * time.Second
}
return config return config
} }

View file

@ -17,7 +17,7 @@ type RecordProcessor interface {
} }
type Checkpointer interface { type Checkpointer interface {
Checkpoint(pair SequencePair, retryCount int) error Checkpoint(pair SequencePair)
Shutdown() Shutdown()
} }
@ -148,23 +148,23 @@ func New(
ioHandler: i, ioHandler: i,
recordProcessor: recordProcessor, recordProcessor: recordProcessor,
checkpoint: make(chan SequencePair), isShuttingDown: false,
checkpointErr: make(chan error), nextCheckpointPair: SequencePair{},
} }
} }
type KCLProcess struct { type KCLProcess struct {
ckpmux sync.Mutex ckpmux sync.Mutex
readmux sync.Mutex
ioHandler ioHandler ioHandler ioHandler
recordProcessor RecordProcessor recordProcessor RecordProcessor
checkpoint chan SequencePair isShuttingDown bool
checkpointErr chan error nextCheckpointPair SequencePair
} }
func (kclp *KCLProcess) reportDone(responseFor string) error { func (kclp *KCLProcess) reportDone(responseFor string) error {
fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing report done line")
return kclp.ioHandler.writeAction(struct { return kclp.ioHandler.writeAction(struct {
Action string `json:"action"` Action string `json:"action"`
ResponseFor string `json:"responseFor"` ResponseFor string `json:"responseFor"`
@ -187,22 +187,29 @@ func (kclp *KCLProcess) performAction(a interface{}) (string, error) {
} }
} }
func (kclp *KCLProcess) handleLine(line string) error { func (kclp *KCLProcess) handleLine(line string) (string, error) {
action, err := kclp.ioHandler.loadAction(line) action, err := kclp.ioHandler.loadAction(line)
if err != nil { if err != nil {
return err return "", err
} }
responseFor, err := kclp.performAction(action) return kclp.performAction(action)
if err != nil {
return err
}
return kclp.reportDone(responseFor)
} }
func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { func (kclp *KCLProcess) Checkpoint(pair SequencePair) {
kclp.ckpmux.Lock() kclp.ckpmux.Lock()
defer kclp.ckpmux.Unlock() defer kclp.ckpmux.Unlock()
if kclp.nextCheckpointPair.IsEmpty() || kclp.nextCheckpointPair.IsLessThan(pair) {
kclp.nextCheckpointPair = pair
}
}
func (kclp *KCLProcess) Shutdown() {
kclp.isShuttingDown = true
}
func (kclp *KCLProcess) sendCheckpoint(pair SequencePair, retryCount int) error {
sleepDuration := 5 * time.Second sleepDuration := 5 * time.Second
for n := 0; n <= retryCount; n++ { for n := 0; n <= retryCount; n++ {
@ -223,6 +230,8 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error {
default: default:
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err)
} }
} else {
return err
} }
if n == retryCount { if n == retryCount {
@ -235,14 +244,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error {
return nil return nil
} }
func (kclp *KCLProcess) Shutdown() {
kclp.Checkpoint(SequencePair{}, 5)
}
func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error {
kclp.readmux.Lock()
defer kclp.readmux.Unlock()
var seq *string var seq *string
var subSeq *int var subSeq *int
if !pair.IsEmpty() { // an empty pair is a signal to shutdown if !pair.IsEmpty() { // an empty pair is a signal to shutdown
@ -250,11 +252,13 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error {
seq = &tmp seq = &tmp
subSeq = &pair.SubSequence subSeq = &pair.SubSequence
} }
fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing checkpoint")
kclp.ioHandler.writeAction(ActionCheckpoint{ kclp.ioHandler.writeAction(ActionCheckpoint{
Action: "checkpoint", Action: "checkpoint",
SequenceNumber: seq, SequenceNumber: seq,
SubSequenceNumber: subSeq, SubSequenceNumber: subSeq,
}) })
fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading checkpoint line")
line, err := kclp.ioHandler.readLine() line, err := kclp.ioHandler.readLine()
if err != nil { if err != nil {
return err return err
@ -276,7 +280,7 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error {
func (kclp *KCLProcess) Run() { func (kclp *KCLProcess) Run() {
for { for {
kclp.readmux.Lock() fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading process line")
line, err := kclp.ioHandler.readLine() line, err := kclp.ioHandler.readLine()
if err == io.EOF { if err == io.EOF {
kclp.ioHandler.writeError("IO stream closed") kclp.ioHandler.writeError("IO stream closed")
@ -289,11 +293,28 @@ func (kclp *KCLProcess) Run() {
return return
} }
err = kclp.handleLine(line) action, err := kclp.handleLine(line)
if err != nil { if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
return return
} }
kclp.readmux.Unlock()
if !kclp.nextCheckpointPair.IsEmpty() {
err := kclp.sendCheckpoint(kclp.nextCheckpointPair, 5)
if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err))
}
kclp.nextCheckpointPair = SequencePair{}
}
err = kclp.reportDone(action)
if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR report done: %s, %+#v", action, err))
return
}
if kclp.isShuttingDown {
kclp.sendCheckpoint(SequencePair{}, 5) // Empty SequencePair is signal to shutdown
}
} }
} }