diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 8e51ee9..6f2f51a 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -10,17 +10,15 @@ import ( ) type sampleRecordProcessor struct { - checkpointer kcl.Checkpointer - checkpointRetries int - checkpointFreq time.Duration - largestPair kcl.SequencePair - lastCheckpoint time.Time + checkpointer kcl.Checkpointer + checkpointFreq time.Duration + largestPair kcl.SequencePair + lastCheckpoint time.Time } func newSampleRecordProcessor() *sampleRecordProcessor { return &sampleRecordProcessor{ - checkpointRetries: 5, - checkpointFreq: 60 * time.Second, + checkpointFreq: 60 * time.Second, } } @@ -47,7 +45,7 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { } } if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq { - srp.checkpointer.Checkpoint(srp.largestPair, srp.checkpointRetries) + srp.checkpointer.Checkpoint(srp.largestPair) srp.lastCheckpoint = time.Now() } return nil