call checkpointer nil in order to let workers know shard is expired

This commit is contained in:
magiusdarrigo 2022-11-25 17:07:01 -06:00
parent 09815b6898
commit 75831cfec5

View file

@ -154,11 +154,12 @@ func (sc *PollingShardConsumer) getRecords() error {
sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
// The shard has been closed, so no new records can be read from it
// The shard has no more records. We can stop polling it. It should expire soon.
if getResp.NextShardIterator == nil {
log.Infof("Shard %s closed", sc.shard.ID)
log.Infof("Shard %s has no more records", sc.shard.ID)
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.TERMINATE, Checkpointer: recordCheckpointer}
sc.recordProcessor.Shutdown(shutdownInput)
recordCheckpointer.Checkpoint(nil)
return nil
}
shardIterator = getResp.NextShardIterator