From 15ecd714f828f9614d1a28aa7e943a0b9d7f3ba8 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Fri, 27 Jan 2023 17:23:18 -0800 Subject: [PATCH] fix: add missing ClaimRequest logic --- clientlibrary/checkpoint/checkpointer.go | 3 ++ .../checkpoint/dynamodb-checkpointer.go | 47 +++++++++++++++++-- clientlibrary/partition/partition.go | 12 +++++ clientlibrary/worker/worker.go | 14 ++++-- 4 files changed, 68 insertions(+), 8 deletions(-) diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index 1af66ba..4e1ff98 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -79,6 +79,9 @@ type Checkpointer interface { // RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment RemoveLeaseOwner(string) error + // RemoveClaimRequest to remove expired claim request for shard + RemoveClaimRequest(string) error + // ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods) ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 3a7e22e..c897d6d 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -235,7 +235,8 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign } return err } - + // if it is the case where there was a claimRequest with this worker for this shard now it can be removed as it is been stolen + checkpointer.RemoveClaimRequest(claimRequest) shard.Mux.Lock() shard.AssignedTo = newAssignTo shard.LeaseTimeout = newLeaseTimeout @@ -288,6 +289,10 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er shard.SetLeaseOwner(assignedTo.(*types.AttributeValueMemberS).Value) } + if claimRequest, ok := checkpoint[ClaimRequestKey]; ok { + shard.SetClaimRequest(claimRequest.(*types.AttributeValueMemberS).Value) + } + // Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" { currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value) @@ -313,6 +318,29 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error { return err } +// RemoveClaimRequest to remove expired claim request for shard +func (checkpointer *DynamoCheckpoint) RemoveClaimRequest(shardID string) error { + input := &dynamodb.UpdateItemInput{ + TableName: aws.String(checkpointer.TableName), + Key: map[string]types.AttributeValue{ + LeaseKeyKey: &types.AttributeValueMemberS{ + Value: shardID, + }, + }, + UpdateExpression: aws.String("remove " + ClaimRequestKey), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":claim_request": &types.AttributeValueMemberS{ + Value: checkpointer.kclConfig.WorkerID, + }, + }, + ConditionExpression: aws.String("ClaimRequest = :claim_request"), + } + + _, err := checkpointer.svc.UpdateItem(context.TODO(), input) + + return err +} + // RemoveLeaseOwner to remove lease owner for the shard entry func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error { input := &dynamodb.UpdateItemInput{ @@ -435,7 +463,7 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha checkpointer.lastLeaseSync = time.Now() input := &dynamodb.ScanInput{ - ProjectionExpression: aws.String(fmt.Sprintf("%s,%s,%s", LeaseKeyKey, LeaseOwnerKey, SequenceNumberKey)), + ProjectionExpression: aws.String(fmt.Sprintf("%s,%s,%s,%s", LeaseKeyKey, LeaseOwnerKey, SequenceNumberKey, ClaimRequestKey)), Select: "SPECIFIC_ATTRIBUTES", TableName: aws.String(checkpointer.kclConfig.TableName), } @@ -452,14 +480,23 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha shardId, foundShardId := result[LeaseKeyKey] assignedTo, foundAssignedTo := result[LeaseOwnerKey] checkpoint, foundCheckpoint := result[SequenceNumberKey] - if !foundShardId || !foundAssignedTo || !foundCheckpoint { + claimRequest, foundClaimRequest := result[ClaimRequestKey] + if !foundShardId { continue } - - if shard, ok := shardStatus[shardId.(*types.AttributeValueMemberS).Value]; ok { + shard, ok := shardStatus[shardId.(*types.AttributeValueMemberS).Value] + if !ok { + continue + } + if foundAssignedTo { shard.SetLeaseOwner(assignedTo.(*types.AttributeValueMemberS).Value) + } + if foundCheckpoint { shard.SetCheckpoint(checkpoint.(*types.AttributeValueMemberS).Value) } + if foundClaimRequest { + shard.SetClaimRequest(claimRequest.(*types.AttributeValueMemberS).Value) + } } if err != nil { diff --git a/clientlibrary/partition/partition.go b/clientlibrary/partition/partition.go index ffd3aff..20e2a46 100644 --- a/clientlibrary/partition/partition.go +++ b/clientlibrary/partition/partition.go @@ -86,6 +86,18 @@ func (ss *ShardStatus) SetLeaseTimeout(timeout time.Time) { ss.LeaseTimeout = timeout } +func (ss *ShardStatus) GetClaimRequest() string { + ss.Mux.Lock() + defer ss.Mux.Unlock() + return ss.ClaimRequest +} + +func (ss *ShardStatus) SetClaimRequest(claimRequest string) { + ss.Mux.Lock() + defer ss.Mux.Unlock() + ss.ClaimRequest = claimRequest +} + func (ss *ShardStatus) IsClaimRequestExpired(kclConfig *config.KinesisClientLibConfiguration) bool { if leaseTimeout := ss.GetLeaseTimeout(); leaseTimeout.IsZero() { return false diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 7001cd3..87a5919 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -316,10 +316,13 @@ func (w *Worker) eventLoop() { } var stealShard bool - if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" { + if w.kclConfig.EnableLeaseStealing && shard.GetClaimRequest() != "" { upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond) + // lease timeout has to before 5 seconds or less from curr time; + // if this is how it is being implemented to deal with expired leases too then we should not pick shard to steal at random + // instead we should choose the shard with the lease closest to expiry? if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) { - if shard.ClaimRequest == w.workerID { + if shard.GetClaimRequest() == w.workerID { stealShard = true log.Debugf("Stealing shard: %s", shard.ID) } else { @@ -329,6 +332,11 @@ func (w *Worker) eventLoop() { } } + // need to have logic like this somewhere to get rid of expired claimRequests + if shard.GetClaimRequest() == w.workerID && shard.IsClaimRequestExpired(w.kclConfig) { + w.checkpointer.RemoveClaimRequest(w.workerID) + } + err = w.checkpointer.GetLease(shard, w.workerID) if err != nil { // cannot get lease on the shard @@ -391,7 +399,7 @@ func (w *Worker) rebalance() error { return err } for _, shard := range w.shardStatus { - if shard.ClaimRequest != "" && shard.ClaimRequest == w.workerID { + if shard.GetClaimRequest() != "" && shard.GetClaimRequest() == w.workerID { log.Debugf("Steal in progress. workerID: %s", w.workerID) return nil }