Remove shard info in checkpointer (#29)
Currently, only local cached shard info has been removed when worker losts the lease. The info inside checkpointer (dynamoDB) is not removed. This causes lease has been hold until the lease expiration and it might take too long for shard is ready for other worker to grab. This change release the lease in checkpointer immediately. The user need to ensure appropriate checkpointing before return from Shutdown callback. Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
parent
fa0bbc42fe
commit
8369884952
1 changed files with 10 additions and 1 deletions
|
|
@ -281,9 +281,18 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Cleanup the internal lease cache
|
||||
// releaseLease releases the lease for the specific shard
|
||||
func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) {
|
||||
log.Infof("Release lease for shard %s", shard.ID)
|
||||
|
||||
// remove the shard entry in dynamoDB as well
|
||||
// Note: The worker has been terminated anyway and we don't need to do anything in case of error here.
|
||||
// The shard information for checkpointer has been removed and it will be recreated during syncShard.
|
||||
if err := sc.checkpointer.RemoveLeaseInfo(shard.ID); err != nil {
|
||||
log.Errorf("Failed to remove shard lease info: %s Error: %+v", shard.ID, err)
|
||||
}
|
||||
|
||||
// remove the shard owner from local status cache
|
||||
shard.SetLeaseOwner("")
|
||||
// reporting lease lose metrics
|
||||
sc.mService.LeaseLost(shard.ID)
|
||||
|
|
|
|||
Loading…
Reference in a new issue