fix: add missing ClaimRequest logic

This commit is contained in:
Shiva Pentakota 2023-01-27 17:23:18 -08:00
parent 42881449ce
commit 15ecd714f8
4 changed files with 68 additions and 8 deletions

View file

@ -79,6 +79,9 @@ type Checkpointer interface {
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
RemoveLeaseOwner(string) error
// RemoveClaimRequest to remove expired claim request for shard
RemoveClaimRequest(string) error
// ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods)
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)

View file

@ -235,7 +235,8 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
}
return err
}
// if it is the case where there was a claimRequest with this worker for this shard now it can be removed as it is been stolen
checkpointer.RemoveClaimRequest(claimRequest)
shard.Mux.Lock()
shard.AssignedTo = newAssignTo
shard.LeaseTimeout = newLeaseTimeout
@ -288,6 +289,10 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
shard.SetLeaseOwner(assignedTo.(*types.AttributeValueMemberS).Value)
}
if claimRequest, ok := checkpoint[ClaimRequestKey]; ok {
shard.SetClaimRequest(claimRequest.(*types.AttributeValueMemberS).Value)
}
// Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" {
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value)
@ -313,6 +318,29 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error {
return err
}
// RemoveClaimRequest to remove expired claim request for shard
func (checkpointer *DynamoCheckpoint) RemoveClaimRequest(shardID string) error {
input := &dynamodb.UpdateItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]types.AttributeValue{
LeaseKeyKey: &types.AttributeValueMemberS{
Value: shardID,
},
},
UpdateExpression: aws.String("remove " + ClaimRequestKey),
ExpressionAttributeValues: map[string]types.AttributeValue{
":claim_request": &types.AttributeValueMemberS{
Value: checkpointer.kclConfig.WorkerID,
},
},
ConditionExpression: aws.String("ClaimRequest = :claim_request"),
}
_, err := checkpointer.svc.UpdateItem(context.TODO(), input)
return err
}
// RemoveLeaseOwner to remove lease owner for the shard entry
func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
input := &dynamodb.UpdateItemInput{
@ -435,7 +463,7 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha
checkpointer.lastLeaseSync = time.Now()
input := &dynamodb.ScanInput{
ProjectionExpression: aws.String(fmt.Sprintf("%s,%s,%s", LeaseKeyKey, LeaseOwnerKey, SequenceNumberKey)),
ProjectionExpression: aws.String(fmt.Sprintf("%s,%s,%s,%s", LeaseKeyKey, LeaseOwnerKey, SequenceNumberKey, ClaimRequestKey)),
Select: "SPECIFIC_ATTRIBUTES",
TableName: aws.String(checkpointer.kclConfig.TableName),
}
@ -452,14 +480,23 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha
shardId, foundShardId := result[LeaseKeyKey]
assignedTo, foundAssignedTo := result[LeaseOwnerKey]
checkpoint, foundCheckpoint := result[SequenceNumberKey]
if !foundShardId || !foundAssignedTo || !foundCheckpoint {
claimRequest, foundClaimRequest := result[ClaimRequestKey]
if !foundShardId {
continue
}
if shard, ok := shardStatus[shardId.(*types.AttributeValueMemberS).Value]; ok {
shard, ok := shardStatus[shardId.(*types.AttributeValueMemberS).Value]
if !ok {
continue
}
if foundAssignedTo {
shard.SetLeaseOwner(assignedTo.(*types.AttributeValueMemberS).Value)
}
if foundCheckpoint {
shard.SetCheckpoint(checkpoint.(*types.AttributeValueMemberS).Value)
}
if foundClaimRequest {
shard.SetClaimRequest(claimRequest.(*types.AttributeValueMemberS).Value)
}
}
if err != nil {

View file

@ -86,6 +86,18 @@ func (ss *ShardStatus) SetLeaseTimeout(timeout time.Time) {
ss.LeaseTimeout = timeout
}
func (ss *ShardStatus) GetClaimRequest() string {
ss.Mux.Lock()
defer ss.Mux.Unlock()
return ss.ClaimRequest
}
func (ss *ShardStatus) SetClaimRequest(claimRequest string) {
ss.Mux.Lock()
defer ss.Mux.Unlock()
ss.ClaimRequest = claimRequest
}
func (ss *ShardStatus) IsClaimRequestExpired(kclConfig *config.KinesisClientLibConfiguration) bool {
if leaseTimeout := ss.GetLeaseTimeout(); leaseTimeout.IsZero() {
return false

View file

@ -316,10 +316,13 @@ func (w *Worker) eventLoop() {
}
var stealShard bool
if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" {
if w.kclConfig.EnableLeaseStealing && shard.GetClaimRequest() != "" {
upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond)
// lease timeout has to before 5 seconds or less from curr time;
// if this is how it is being implemented to deal with expired leases too then we should not pick shard to steal at random
// instead we should choose the shard with the lease closest to expiry?
if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) {
if shard.ClaimRequest == w.workerID {
if shard.GetClaimRequest() == w.workerID {
stealShard = true
log.Debugf("Stealing shard: %s", shard.ID)
} else {
@ -329,6 +332,11 @@ func (w *Worker) eventLoop() {
}
}
// need to have logic like this somewhere to get rid of expired claimRequests
if shard.GetClaimRequest() == w.workerID && shard.IsClaimRequestExpired(w.kclConfig) {
w.checkpointer.RemoveClaimRequest(w.workerID)
}
err = w.checkpointer.GetLease(shard, w.workerID)
if err != nil {
// cannot get lease on the shard
@ -391,7 +399,7 @@ func (w *Worker) rebalance() error {
return err
}
for _, shard := range w.shardStatus {
if shard.ClaimRequest != "" && shard.ClaimRequest == w.workerID {
if shard.GetClaimRequest() != "" && shard.GetClaimRequest() == w.workerID {
log.Debugf("Steal in progress. workerID: %s", w.workerID)
return nil
}