From e6229ce8bb833a0a685a770b4f0a7bbc7af11e79 Mon Sep 17 00:00:00 2001 From: Nathan Leiby Date: Wed, 5 Apr 2017 13:51:44 -0700 Subject: [PATCH] checkpointing fix - always try checkpoint at least once (previously, a value of 0 retries meant it would never checkpoint) - setup record processor using New(), so we get default checkpointing configuration --- cmd/consumer/main.go | 46 ++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index cb311b1..acb6de8 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -32,27 +32,31 @@ func (srp *SampleRecordProcessor) Initialize(shardID string) error { } func (srp *SampleRecordProcessor) checkpoint(checkpointer kcl.Checkpointer, sequenceNumber string, subSequenceNumber int) { - for n := 0; n < srp.checkpointRetries; n++ { - if err := checkpointer.Checkpoint(sequenceNumber, subSequenceNumber); err == nil { - break - } else { - if cperr, ok := err.(kcl.CheckpointError); ok { - switch cperr.Error() { - case "ShutdownException": - fmt.Fprintf(os.Stderr, "Encountered shutdown exception, skipping checkpoint\n") - return - case "ThrottlingException": - if srp.checkpointRetries-1 == n { - fmt.Fprintf(os.Stderr, "Failed to checkpoint after %d attempts, giving up.\n", n) - return - } - fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s", srp.sleepDuration) - case "InvalidStateException": - fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n") - } - } - fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) + for n := -1; n < srp.checkpointRetries; n++ { + err := checkpointer.Checkpoint(sequenceNumber, subSequenceNumber) + if err == nil { + return } + + if cperr, ok := err.(kcl.CheckpointError); ok { + switch cperr.Error() { + case "ShutdownException": + fmt.Fprintf(os.Stderr, "Encountered shutdown exception, skipping checkpoint\n") + return + case "ThrottlingException": + fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s", srp.sleepDuration) + case "InvalidStateException": + fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n") + default: + fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) + } + } + + if n == srp.checkpointRetries { + fmt.Fprintf(os.Stderr, "Failed to checkpoint after %d attempts, giving up.\n", srp.checkpointRetries) + return + } + time.Sleep(srp.sleepDuration) } } @@ -97,6 +101,6 @@ func main() { panic(err) } defer f.Close() - kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, &SampleRecordProcessor{}) + kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, New()) kclProcess.Run() }