From 17cfe98efa3c2ddc8b55378b6865c8d1ca3d7271 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Fri, 19 May 2017 22:01:57 +0000 Subject: [PATCH 1/5] Users of this library can now cache the checkpointer object --- cmd/consumer/main.go | 18 ++++++++++-------- kcl/kcl.go | 36 ++++++++++++++++++------------------ 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 4602c13..860a9ca 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -10,6 +10,7 @@ import ( ) type SampleRecordProcessor struct { + checkpointer *kcl.Checkpointer sleepDuration time.Duration checkpointRetries int checkpointFreq time.Duration @@ -26,14 +27,15 @@ func New() *SampleRecordProcessor { } } -func (srp *SampleRecordProcessor) Initialize(shardID string) error { +func (srp *SampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.Checkpointer) error { srp.lastCheckpoint = time.Now() + srp.checkpointer = checkpointer return nil } -func (srp *SampleRecordProcessor) checkpoint(checkpointer kcl.Checkpointer, sequenceNumber *string, subSequenceNumber *int) { - for n := -1; n < srp.checkpointRetries; n++ { - err := checkpointer.Checkpoint(sequenceNumber, subSequenceNumber) +func (srp *SampleRecordProcessor) checkpoint(sequenceNumber *string, subSequenceNumber *int) { + for n := 0; n < srp.checkpointRetries+1; n++ { + err := srp.checkpointer.Checkpoint(sequenceNumber, subSequenceNumber) if err == nil { return } @@ -66,7 +68,7 @@ func (srp *SampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, (sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq) } -func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record, checkpointer kcl.Checkpointer) error { +func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error { for _, record := range records { seqNumber := new(big.Int) if _, ok := seqNumber.SetString(record.SequenceNumber, 10); !ok { @@ -80,16 +82,16 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record, checkpoin } if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq { largestSeq := srp.largestSeq.String() - srp.checkpoint(checkpointer, &largestSeq, &srp.largestSubSeq) + srp.checkpoint(&largestSeq, &srp.largestSubSeq) srp.lastCheckpoint = time.Now() } return nil } -func (srp *SampleRecordProcessor) Shutdown(checkpointer kcl.Checkpointer, reason string) error { +func (srp *SampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") - srp.checkpoint(checkpointer, nil, nil) + srp.checkpoint(nil, nil) } else { fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n") } diff --git a/kcl/kcl.go b/kcl/kcl.go index a9f4d9c..1933a68 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -9,16 +9,24 @@ import ( ) type RecordProcessor interface { - Initialize(shardID string) error - ProcessRecords(records []Record, checkpointer Checkpointer) error - Shutdown(checkpointer Checkpointer, reason string) error + Initialize(shardID string, checkpointer *Checkpointer) error + ProcessRecords(records []Record) error + Shutdown(reason string) error +} + +type CheckpointError struct { + e string +} + +func (ce CheckpointError) Error() string { + return ce.e } type Checkpointer struct { ioHandler ioHandler } -func (c Checkpointer) getAction() (interface{}, error) { +func (c *Checkpointer) getAction() (interface{}, error) { line, err := c.ioHandler.readLine() if err != nil { return nil, err @@ -30,15 +38,7 @@ func (c Checkpointer) getAction() (interface{}, error) { return action, nil } -type CheckpointError struct { - e string -} - -func (ce CheckpointError) Error() string { - return ce.e -} - -func (c Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { +func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { c.ioHandler.writeAction(ActionCheckpoint{ Action: "checkpoint", SequenceNumber: sequenceNumber, @@ -178,7 +178,7 @@ func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor R } return &KCLProcess{ ioHandler: i, - checkpointer: Checkpointer{ + checkpointer: &Checkpointer{ ioHandler: i, }, recordProcessor: recordProcessor, @@ -187,7 +187,7 @@ func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor R type KCLProcess struct { ioHandler ioHandler - checkpointer Checkpointer + checkpointer *Checkpointer recordProcessor RecordProcessor } @@ -204,11 +204,11 @@ func (kclp *KCLProcess) reportDone(responseFor string) error { func (kclp *KCLProcess) performAction(a interface{}) (string, error) { switch action := a.(type) { case ActionInitialize: - return action.Action, kclp.recordProcessor.Initialize(action.ShardID) + return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp.checkpointer) case ActionProcessRecords: - return action.Action, kclp.recordProcessor.ProcessRecords(action.Records, kclp.checkpointer) + return action.Action, kclp.recordProcessor.ProcessRecords(action.Records) case ActionShutdown: - return action.Action, kclp.recordProcessor.Shutdown(kclp.checkpointer, action.Reason) + return action.Action, kclp.recordProcessor.Shutdown(action.Reason) default: return "", fmt.Errorf("unknown action to dispatch: %s", action) } From 3c56b57e6b7fc8f1d9db5ab8b293c437f869367f Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Fri, 19 May 2017 22:30:11 +0000 Subject: [PATCH 2/5] Added mutex to Checkpoint function to avoid racey conditions --- kcl/kcl.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kcl/kcl.go b/kcl/kcl.go index 1933a68..8d25f24 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "sync" ) type RecordProcessor interface { @@ -23,6 +24,8 @@ func (ce CheckpointError) Error() string { } type Checkpointer struct { + mux sync.Mutex + ioHandler ioHandler } @@ -39,6 +42,9 @@ func (c *Checkpointer) getAction() (interface{}, error) { } func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { + c.mux.Lock() + defer c.mux.Unlock() + c.ioHandler.writeAction(ActionCheckpoint{ Action: "checkpoint", SequenceNumber: sequenceNumber, @@ -62,7 +68,6 @@ func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int } } return nil - } type ioHandler struct { From 02b053c0ee3764bb9ea26faf8288f9a7d764c026 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 22 May 2017 22:57:34 +0000 Subject: [PATCH 3/5] Added vendor to .gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index f2a12a4..4df48e8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +vendor + # osx / sshfs ._* .DS_Store From 5f507ab116813c9f1c99026aef1376202d3d5493 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 22 May 2017 23:05:34 +0000 Subject: [PATCH 4/5] Moved retry and error handling logic to Checkpointer class --- cmd/consumer/main.go | 34 +--------------------------------- kcl/kcl.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 860a9ca..2d78381 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -11,7 +11,6 @@ import ( type SampleRecordProcessor struct { checkpointer *kcl.Checkpointer - sleepDuration time.Duration checkpointRetries int checkpointFreq time.Duration largestSeq *big.Int @@ -21,7 +20,6 @@ type SampleRecordProcessor struct { func New() *SampleRecordProcessor { return &SampleRecordProcessor{ - sleepDuration: 5 * time.Second, checkpointRetries: 5, checkpointFreq: 60 * time.Second, } @@ -33,36 +31,6 @@ func (srp *SampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.C return nil } -func (srp *SampleRecordProcessor) checkpoint(sequenceNumber *string, subSequenceNumber *int) { - for n := 0; n < srp.checkpointRetries+1; n++ { - err := srp.checkpointer.Checkpoint(sequenceNumber, subSequenceNumber) - if err == nil { - return - } - - if cperr, ok := err.(kcl.CheckpointError); ok { - switch cperr.Error() { - case "ShutdownException": - fmt.Fprintf(os.Stderr, "Encountered shutdown exception, skipping checkpoint\n") - return - case "ThrottlingException": - fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s", srp.sleepDuration) - case "InvalidStateException": - fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n") - default: - fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) - } - } - - if n == srp.checkpointRetries { - fmt.Fprintf(os.Stderr, "Failed to checkpoint after %d attempts, giving up.\n", srp.checkpointRetries) - return - } - - time.Sleep(srp.sleepDuration) - } -} - func (srp *SampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool { return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 || (sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq) @@ -82,7 +50,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error { } if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq { largestSeq := srp.largestSeq.String() - srp.checkpoint(&largestSeq, &srp.largestSubSeq) + srp.checkpointer.CheckpointWithRetry(&largestSeq, &srp.largestSubSeq, srp.checkpointRetries) srp.lastCheckpoint = time.Now() } return nil diff --git a/kcl/kcl.go b/kcl/kcl.go index 8d25f24..92abb3f 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -6,7 +6,9 @@ import ( "encoding/json" "fmt" "io" + "os" "sync" + "time" ) type RecordProcessor interface { @@ -70,6 +72,42 @@ func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int return nil } +// CheckpointWithRetry tries to save a checkPoint up to `retryCount` + 1 times. +// `retryCount` should be >= 0 +func (c *Checkpointer) CheckpointWithRetry( + sequenceNumber *string, subSequenceNumber *int, retryCount int, +) error { + sleepDuration := 5 * time.Second + + for n := 0; n <= retryCount; n++ { + err := c.Checkpoint(sequenceNumber, subSequenceNumber) + if err == nil { + return nil + } + + if cperr, ok := err.(CheckpointError); ok { + switch cperr.Error() { + case "ShutdownException": + return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") + case "ThrottlingException": + fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s\n", sleepDuration) + case "InvalidStateException": + fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n") + default: + fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) + } + } + + if n == retryCount { + return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount) + } + + time.Sleep(sleepDuration) + } + + return nil +} + type ioHandler struct { inputFile io.Reader outputFile io.Writer From 458e66e3214ccfc27f0c21667fceef81da6a8af6 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 22 May 2017 23:06:56 +0000 Subject: [PATCH 5/5] Added shutdown method to *Checkerpointer struct --- cmd/consumer/main.go | 2 +- kcl/kcl.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 2d78381..22f8ccc 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -59,7 +59,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error { func (srp *SampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") - srp.checkpoint(nil, nil) + srp.checkpointer.Shutdown() } else { fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n") } diff --git a/kcl/kcl.go b/kcl/kcl.go index 92abb3f..1ccd333 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -108,6 +108,10 @@ func (c *Checkpointer) CheckpointWithRetry( return nil } +func (c *Checkpointer) Shutdown() { + c.CheckpointWithRetry(nil, nil, 5) +} + type ioHandler struct { inputFile io.Reader outputFile io.Writer