commit
695f85d99d
1 changed files with 25 additions and 21 deletions
|
|
@ -32,27 +32,31 @@ func (srp *SampleRecordProcessor) Initialize(shardID string) error {
|
|||
}
|
||||
|
||||
func (srp *SampleRecordProcessor) checkpoint(checkpointer kcl.Checkpointer, sequenceNumber string, subSequenceNumber int) {
|
||||
for n := 0; n < srp.checkpointRetries; n++ {
|
||||
if err := checkpointer.Checkpoint(sequenceNumber, subSequenceNumber); err == nil {
|
||||
break
|
||||
} else {
|
||||
if cperr, ok := err.(kcl.CheckpointError); ok {
|
||||
switch cperr.Error() {
|
||||
case "ShutdownException":
|
||||
fmt.Fprintf(os.Stderr, "Encountered shutdown exception, skipping checkpoint\n")
|
||||
return
|
||||
case "ThrottlingException":
|
||||
if srp.checkpointRetries-1 == n {
|
||||
fmt.Fprintf(os.Stderr, "Failed to checkpoint after %d attempts, giving up.\n", n)
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s", srp.sleepDuration)
|
||||
case "InvalidStateException":
|
||||
fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n")
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err)
|
||||
for n := -1; n < srp.checkpointRetries; n++ {
|
||||
err := checkpointer.Checkpoint(sequenceNumber, subSequenceNumber)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if cperr, ok := err.(kcl.CheckpointError); ok {
|
||||
switch cperr.Error() {
|
||||
case "ShutdownException":
|
||||
fmt.Fprintf(os.Stderr, "Encountered shutdown exception, skipping checkpoint\n")
|
||||
return
|
||||
case "ThrottlingException":
|
||||
fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s", srp.sleepDuration)
|
||||
case "InvalidStateException":
|
||||
fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n")
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
if n == srp.checkpointRetries {
|
||||
fmt.Fprintf(os.Stderr, "Failed to checkpoint after %d attempts, giving up.\n", srp.checkpointRetries)
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(srp.sleepDuration)
|
||||
}
|
||||
}
|
||||
|
|
@ -97,6 +101,6 @@ func main() {
|
|||
panic(err)
|
||||
}
|
||||
defer f.Close()
|
||||
kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, &SampleRecordProcessor{})
|
||||
kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, New())
|
||||
kclProcess.Run()
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue