From 5f507ab116813c9f1c99026aef1376202d3d5493 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 22 May 2017 23:05:34 +0000 Subject: [PATCH] Moved retry and error handling logic to Checkpointer class --- cmd/consumer/main.go | 34 +--------------------------------- kcl/kcl.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 860a9ca..2d78381 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -11,7 +11,6 @@ import ( type SampleRecordProcessor struct { checkpointer *kcl.Checkpointer - sleepDuration time.Duration checkpointRetries int checkpointFreq time.Duration largestSeq *big.Int @@ -21,7 +20,6 @@ type SampleRecordProcessor struct { func New() *SampleRecordProcessor { return &SampleRecordProcessor{ - sleepDuration: 5 * time.Second, checkpointRetries: 5, checkpointFreq: 60 * time.Second, } @@ -33,36 +31,6 @@ func (srp *SampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.C return nil } -func (srp *SampleRecordProcessor) checkpoint(sequenceNumber *string, subSequenceNumber *int) { - for n := 0; n < srp.checkpointRetries+1; n++ { - err := srp.checkpointer.Checkpoint(sequenceNumber, subSequenceNumber) - if err == nil { - return - } - - if cperr, ok := err.(kcl.CheckpointError); ok { - switch cperr.Error() { - case "ShutdownException": - fmt.Fprintf(os.Stderr, "Encountered shutdown exception, skipping checkpoint\n") - return - case "ThrottlingException": - fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s", srp.sleepDuration) - case "InvalidStateException": - fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n") - default: - fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) - } - } - - if n == srp.checkpointRetries { - fmt.Fprintf(os.Stderr, "Failed to checkpoint after %d attempts, giving up.\n", srp.checkpointRetries) - return - } - - time.Sleep(srp.sleepDuration) - } -} - func (srp *SampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool { return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 || (sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq) @@ -82,7 +50,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error { } if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq { largestSeq := srp.largestSeq.String() - srp.checkpoint(&largestSeq, &srp.largestSubSeq) + srp.checkpointer.CheckpointWithRetry(&largestSeq, &srp.largestSubSeq, srp.checkpointRetries) srp.lastCheckpoint = time.Now() } return nil diff --git a/kcl/kcl.go b/kcl/kcl.go index 8d25f24..92abb3f 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -6,7 +6,9 @@ import ( "encoding/json" "fmt" "io" + "os" "sync" + "time" ) type RecordProcessor interface { @@ -70,6 +72,42 @@ func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int return nil } +// CheckpointWithRetry tries to save a checkPoint up to `retryCount` + 1 times. +// `retryCount` should be >= 0 +func (c *Checkpointer) CheckpointWithRetry( + sequenceNumber *string, subSequenceNumber *int, retryCount int, +) error { + sleepDuration := 5 * time.Second + + for n := 0; n <= retryCount; n++ { + err := c.Checkpoint(sequenceNumber, subSequenceNumber) + if err == nil { + return nil + } + + if cperr, ok := err.(CheckpointError); ok { + switch cperr.Error() { + case "ShutdownException": + return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") + case "ThrottlingException": + fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s\n", sleepDuration) + case "InvalidStateException": + fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n") + default: + fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) + } + } + + if n == retryCount { + return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount) + } + + time.Sleep(sleepDuration) + } + + return nil +} + type ioHandler struct { inputFile io.Reader outputFile io.Writer