diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index 9a877fd..eb755b6 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -11,8 +11,7 @@ import ( type checkpointManager struct { log kv.KayveeLogger - checkpointRetries int - checkpointFreq time.Duration + checkpointFreq time.Duration checkpoint chan kcl.SequencePair shutdown chan struct{} @@ -24,8 +23,7 @@ func NewCheckpointManager( cm := &checkpointManager{ log: log, - checkpointRetries: config.CheckpointRetries, - checkpointFreq: config.CheckpointFreq, + checkpointFreq: config.CheckpointFreq, checkpoint: make(chan kcl.SequencePair), shutdown: make(chan struct{}), @@ -72,12 +70,8 @@ func (cm *checkpointManager) startCheckpointHandler( } if !pair.IsEmpty() { - err := checkpointer.Checkpoint(pair, cm.checkpointRetries) - if err != nil { - cm.log.ErrorD("checkpoint-err", kv.M{"msg": err.Error()}) - } else { - lastCheckpoint = time.Now() - } + checkpointer.Checkpoint(pair) + lastCheckpoint = time.Now() } if isShuttingDown { diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 2b6f023..9b16e0b 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -33,10 +33,6 @@ type Config struct { // CheckpointFreq the frequency in which a checkpoint is saved CheckpointFreq time.Duration - // CheckpointRetries the number of times the consumer will try to save a checkpoint - CheckpointRetries int - // CheckpointRetrySleep the amount of time between checkpoint save attempts - CheckpointRetrySleep time.Duration } // BatchConsumer is responsible for marshalling @@ -75,12 +71,6 @@ func withDefaults(config Config) Config { if config.CheckpointFreq == 0 { config.CheckpointFreq = 60 * time.Second } - if config.CheckpointRetries == 0 { - config.CheckpointRetries = 5 - } - if config.CheckpointRetrySleep == 0 { - config.CheckpointRetrySleep = 5 * time.Second - } return config } diff --git a/kcl/kcl.go b/kcl/kcl.go index c470637..5dbc3a7 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -17,7 +17,7 @@ type RecordProcessor interface { } type Checkpointer interface { - Checkpoint(pair SequencePair, retryCount int) error + Checkpoint(pair SequencePair) Shutdown() } @@ -148,23 +148,23 @@ func New( ioHandler: i, recordProcessor: recordProcessor, - checkpoint: make(chan SequencePair), - checkpointErr: make(chan error), + isShuttingDown: false, + nextCheckpointPair: SequencePair{}, } } type KCLProcess struct { - ckpmux sync.Mutex - readmux sync.Mutex + ckpmux sync.Mutex ioHandler ioHandler recordProcessor RecordProcessor - checkpoint chan SequencePair - checkpointErr chan error + isShuttingDown bool + nextCheckpointPair SequencePair } func (kclp *KCLProcess) reportDone(responseFor string) error { + fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing report done line") return kclp.ioHandler.writeAction(struct { Action string `json:"action"` ResponseFor string `json:"responseFor"` @@ -187,22 +187,29 @@ func (kclp *KCLProcess) performAction(a interface{}) (string, error) { } } -func (kclp *KCLProcess) handleLine(line string) error { +func (kclp *KCLProcess) handleLine(line string) (string, error) { action, err := kclp.ioHandler.loadAction(line) if err != nil { - return err + return "", err } - responseFor, err := kclp.performAction(action) - if err != nil { - return err - } - return kclp.reportDone(responseFor) + return kclp.performAction(action) } -func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { +func (kclp *KCLProcess) Checkpoint(pair SequencePair) { kclp.ckpmux.Lock() defer kclp.ckpmux.Unlock() + + if kclp.nextCheckpointPair.IsEmpty() || kclp.nextCheckpointPair.IsLessThan(pair) { + kclp.nextCheckpointPair = pair + } +} + +func (kclp *KCLProcess) Shutdown() { + kclp.isShuttingDown = true +} + +func (kclp *KCLProcess) sendCheckpoint(pair SequencePair, retryCount int) error { sleepDuration := 5 * time.Second for n := 0; n <= retryCount; n++ { @@ -223,6 +230,8 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { default: fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) } + } else { + return err } if n == retryCount { @@ -235,14 +244,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { return nil } -func (kclp *KCLProcess) Shutdown() { - kclp.Checkpoint(SequencePair{}, 5) -} - func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { - kclp.readmux.Lock() - defer kclp.readmux.Unlock() - var seq *string var subSeq *int if !pair.IsEmpty() { // an empty pair is a signal to shutdown @@ -250,11 +252,13 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { seq = &tmp subSeq = &pair.SubSequence } + fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing checkpoint") kclp.ioHandler.writeAction(ActionCheckpoint{ Action: "checkpoint", SequenceNumber: seq, SubSequenceNumber: subSeq, }) + fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading checkpoint line") line, err := kclp.ioHandler.readLine() if err != nil { return err @@ -276,7 +280,7 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { func (kclp *KCLProcess) Run() { for { - kclp.readmux.Lock() + fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading process line") line, err := kclp.ioHandler.readLine() if err == io.EOF { kclp.ioHandler.writeError("IO stream closed") @@ -289,11 +293,28 @@ func (kclp *KCLProcess) Run() { return } - err = kclp.handleLine(line) + action, err := kclp.handleLine(line) if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return } - kclp.readmux.Unlock() + + if !kclp.nextCheckpointPair.IsEmpty() { + err := kclp.sendCheckpoint(kclp.nextCheckpointPair, 5) + if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err)) + } + kclp.nextCheckpointPair = SequencePair{} + } + + err = kclp.reportDone(action) + if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR report done: %s, %+#v", action, err)) + return + } + + if kclp.isShuttingDown { + kclp.sendCheckpoint(SequencePair{}, 5) // Empty SequencePair is signal to shutdown + } } }