Merge pull request #1 from magiusdarrigo/fix-shard-shutdown-lease-release
[MTCES-1062] call checkpointer nil in order to let workers know shard is expired
This commit is contained in:
commit
72d3b6f328
1 changed files with 3 additions and 2 deletions
|
|
@ -154,11 +154,12 @@ func (sc *PollingShardConsumer) getRecords() error {
|
||||||
|
|
||||||
sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
|
sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
|
||||||
|
|
||||||
// The shard has been closed, so no new records can be read from it
|
// The shard has no more records. We can stop polling it. It should expire soon.
|
||||||
if getResp.NextShardIterator == nil {
|
if getResp.NextShardIterator == nil {
|
||||||
log.Infof("Shard %s closed", sc.shard.ID)
|
log.Infof("Shard %s has no more records", sc.shard.ID)
|
||||||
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.TERMINATE, Checkpointer: recordCheckpointer}
|
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.TERMINATE, Checkpointer: recordCheckpointer}
|
||||||
sc.recordProcessor.Shutdown(shutdownInput)
|
sc.recordProcessor.Shutdown(shutdownInput)
|
||||||
|
recordCheckpointer.Checkpoint(nil)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
shardIterator = getResp.NextShardIterator
|
shardIterator = getResp.NextShardIterator
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue