From 83698849525d5fe96ae7711af2d7224fbe44771d Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Mon, 8 Jul 2019 18:04:37 -0500 Subject: [PATCH] Remove shard info in checkpointer (#29) Currently, only local cached shard info has been removed when worker losts the lease. The info inside checkpointer (dynamoDB) is not removed. This causes lease has been hold until the lease expiration and it might take too long for shard is ready for other worker to grab. This change release the lease in checkpointer immediately. The user need to ensure appropriate checkpointing before return from Shutdown callback. Signed-off-by: Tao Jiang --- clientlibrary/worker/shard-consumer.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 7899a67..7cd8c00 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -281,9 +281,18 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error { } } -// Cleanup the internal lease cache +// releaseLease releases the lease for the specific shard func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) { log.Infof("Release lease for shard %s", shard.ID) + + // remove the shard entry in dynamoDB as well + // Note: The worker has been terminated anyway and we don't need to do anything in case of error here. + // The shard information for checkpointer has been removed and it will be recreated during syncShard. + if err := sc.checkpointer.RemoveLeaseInfo(shard.ID); err != nil { + log.Errorf("Failed to remove shard lease info: %s Error: %+v", shard.ID, err) + } + + // remove the shard owner from local status cache shard.SetLeaseOwner("") // reporting lease lose metrics sc.mService.LeaseLost(shard.ID)