diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 7899a67..7cd8c00 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -281,9 +281,18 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error { } } -// Cleanup the internal lease cache +// releaseLease releases the lease for the specific shard func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) { log.Infof("Release lease for shard %s", shard.ID) + + // remove the shard entry in dynamoDB as well + // Note: The worker has been terminated anyway and we don't need to do anything in case of error here. + // The shard information for checkpointer has been removed and it will be recreated during syncShard. + if err := sc.checkpointer.RemoveLeaseInfo(shard.ID); err != nil { + log.Errorf("Failed to remove shard lease info: %s Error: %+v", shard.ID, err) + } + + // remove the shard owner from local status cache shard.SetLeaseOwner("") // reporting lease lose metrics sc.mService.LeaseLost(shard.ID)