Merge pull request #35 from wwwjfy/fix-checkpoint
fix periodic checkpoint not triggered
This commit is contained in:
commit
7351346041
1 changed files with 1 additions and 1 deletions
|
|
@ -29,7 +29,7 @@ func (srp *sampleRecordProcessor) Initialize(shardID string, checkpointer kcl.Ch
|
|||
}
|
||||
|
||||
func (srp *sampleRecordProcessor) shouldUpdateSequence(pair kcl.SequencePair) bool {
|
||||
return srp.largestPair.IsLessThan(pair)
|
||||
return srp.largestPair.IsNil() || srp.largestPair.IsLessThan(pair)
|
||||
}
|
||||
|
||||
func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
|
||||
|
|
|
|||
Loading…
Reference in a new issue