diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index acb6de8..4602c13 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -31,7 +31,7 @@ func (srp *SampleRecordProcessor) Initialize(shardID string) error { return nil } -func (srp *SampleRecordProcessor) checkpoint(checkpointer kcl.Checkpointer, sequenceNumber string, subSequenceNumber int) { +func (srp *SampleRecordProcessor) checkpoint(checkpointer kcl.Checkpointer, sequenceNumber *string, subSequenceNumber *int) { for n := -1; n < srp.checkpointRetries; n++ { err := checkpointer.Checkpoint(sequenceNumber, subSequenceNumber) if err == nil { @@ -79,7 +79,8 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record, checkpoin } } if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq { - srp.checkpoint(checkpointer, srp.largestSeq.String(), srp.largestSubSeq) + largestSeq := srp.largestSeq.String() + srp.checkpoint(checkpointer, &largestSeq, &srp.largestSubSeq) srp.lastCheckpoint = time.Now() } return nil @@ -88,7 +89,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record, checkpoin func (srp *SampleRecordProcessor) Shutdown(checkpointer kcl.Checkpointer, reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") - srp.checkpoint(checkpointer, "", 0) + srp.checkpoint(checkpointer, nil, nil) } else { fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n") } diff --git a/kcl/kcl.go b/kcl/kcl.go index ca38f2e..a9f4d9c 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -38,7 +38,7 @@ func (ce CheckpointError) Error() string { return ce.e } -func (c Checkpointer) Checkpoint(sequenceNumber string, subSequenceNumber int) error { +func (c Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { c.ioHandler.writeAction(ActionCheckpoint{ Action: "checkpoint", SequenceNumber: sequenceNumber, @@ -56,9 +56,9 @@ func (c Checkpointer) Checkpoint(sequenceNumber string, subSequenceNumber int) e if !ok { return fmt.Errorf("expected checkpoint response, got '%s'", line.String()) } - if action.Error != "" { + if action.Error != nil && *action.Error != "" { return CheckpointError{ - e: action.Error, + e: *action.Error, } } return nil @@ -117,10 +117,10 @@ type ActionShutdown struct { } type ActionCheckpoint struct { - Action string `json:"action"` - SequenceNumber string `json:"sequenceNumber"` - SubSequenceNumber int `json:"subSequenceNumber"` - Error string `json:"error"` + Action string `json:"action"` + SequenceNumber *string `json:"sequenceNumber,omitempty"` + SubSequenceNumber *int `json:"subSequenceNumber,omitempty"` + Error *string `json:"error,omitempty"` } func (i ioHandler) loadAction(line string) (interface{}, error) {