This reverts commit 7e382e90d5d9eb30ed38cc1ab452336860f48b57.
This commit is contained in:
parent
8369884952
commit
ac8d341cb1
1 changed files with 1 additions and 10 deletions
|
|
@ -281,18 +281,9 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// releaseLease releases the lease for the specific shard
|
// Cleanup the internal lease cache
|
||||||
func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) {
|
func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) {
|
||||||
log.Infof("Release lease for shard %s", shard.ID)
|
log.Infof("Release lease for shard %s", shard.ID)
|
||||||
|
|
||||||
// remove the shard entry in dynamoDB as well
|
|
||||||
// Note: The worker has been terminated anyway and we don't need to do anything in case of error here.
|
|
||||||
// The shard information for checkpointer has been removed and it will be recreated during syncShard.
|
|
||||||
if err := sc.checkpointer.RemoveLeaseInfo(shard.ID); err != nil {
|
|
||||||
log.Errorf("Failed to remove shard lease info: %s Error: %+v", shard.ID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove the shard owner from local status cache
|
|
||||||
shard.SetLeaseOwner("")
|
shard.SetLeaseOwner("")
|
||||||
// reporting lease lose metrics
|
// reporting lease lose metrics
|
||||||
sc.mService.LeaseLost(shard.ID)
|
sc.mService.LeaseLost(shard.ID)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue