Moved retry and error handling logic to Checkpointer class

This commit is contained in:
Xavi Ramirez 2017-05-22 23:05:34 +00:00
parent 02b053c0ee
commit 5f507ab116
2 changed files with 39 additions and 33 deletions

View file

@ -11,7 +11,6 @@ import (
type SampleRecordProcessor struct {
checkpointer *kcl.Checkpointer
sleepDuration time.Duration
checkpointRetries int
checkpointFreq time.Duration
largestSeq *big.Int
@ -21,7 +20,6 @@ type SampleRecordProcessor struct {
func New() *SampleRecordProcessor {
return &SampleRecordProcessor{
sleepDuration: 5 * time.Second,
checkpointRetries: 5,
checkpointFreq: 60 * time.Second,
}
@ -33,36 +31,6 @@ func (srp *SampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.C
return nil
}
func (srp *SampleRecordProcessor) checkpoint(sequenceNumber *string, subSequenceNumber *int) {
for n := 0; n < srp.checkpointRetries+1; n++ {
err := srp.checkpointer.Checkpoint(sequenceNumber, subSequenceNumber)
if err == nil {
return
}
if cperr, ok := err.(kcl.CheckpointError); ok {
switch cperr.Error() {
case "ShutdownException":
fmt.Fprintf(os.Stderr, "Encountered shutdown exception, skipping checkpoint\n")
return
case "ThrottlingException":
fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s", srp.sleepDuration)
case "InvalidStateException":
fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n")
default:
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err)
}
}
if n == srp.checkpointRetries {
fmt.Fprintf(os.Stderr, "Failed to checkpoint after %d attempts, giving up.\n", srp.checkpointRetries)
return
}
time.Sleep(srp.sleepDuration)
}
}
func (srp *SampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool {
return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 ||
(sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq)
@ -82,7 +50,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
}
if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq {
largestSeq := srp.largestSeq.String()
srp.checkpoint(&largestSeq, &srp.largestSubSeq)
srp.checkpointer.CheckpointWithRetry(&largestSeq, &srp.largestSubSeq, srp.checkpointRetries)
srp.lastCheckpoint = time.Now()
}
return nil

View file

@ -6,7 +6,9 @@ import (
"encoding/json"
"fmt"
"io"
"os"
"sync"
"time"
)
type RecordProcessor interface {
@ -70,6 +72,42 @@ func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int
return nil
}
// CheckpointWithRetry tries to save a checkPoint up to `retryCount` + 1 times.
// `retryCount` should be >= 0
func (c *Checkpointer) CheckpointWithRetry(
sequenceNumber *string, subSequenceNumber *int, retryCount int,
) error {
sleepDuration := 5 * time.Second
for n := 0; n <= retryCount; n++ {
err := c.Checkpoint(sequenceNumber, subSequenceNumber)
if err == nil {
return nil
}
if cperr, ok := err.(CheckpointError); ok {
switch cperr.Error() {
case "ShutdownException":
return fmt.Errorf("Encountered shutdown exception, skipping checkpoint")
case "ThrottlingException":
fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s\n", sleepDuration)
case "InvalidStateException":
fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n")
default:
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err)
}
}
if n == retryCount {
return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount)
}
time.Sleep(sleepDuration)
}
return nil
}
type ioHandler struct {
inputFile io.Reader
outputFile io.Writer