KCL: Remove lease entry in dynamoDB table when shard no longer exists
Need to remove lease entry in dynamodb table when shard has been removed by Kinesis. This happens when doing shard splitting and parent shard will be moved by Kinesis after its retention period (normally after 24 hours). Change-Id: I70a5836436ac0698110085d46d9438fcaf539cd2
This commit is contained in:
parent
6384d89748
commit
2542ea1416
2 changed files with 44 additions and 0 deletions
|
|
@ -36,6 +36,7 @@ type Checkpointer interface {
|
|||
GetLease(*shardStatus, string) error
|
||||
CheckpointSequence(*shardStatus) error
|
||||
FetchCheckpoint(*shardStatus) error
|
||||
RemoveLeaseInfo(string) error
|
||||
}
|
||||
|
||||
// ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found
|
||||
|
|
@ -203,6 +204,19 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error
|
|||
return nil
|
||||
}
|
||||
|
||||
// RemoveLeaseInfo to remove lease info for shard entry in dynamoDB because the shard no longer exists in Kinesis
|
||||
func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error {
|
||||
err := checkpointer.removeItem(shardID)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Error in removing lease info for shard: %s, Error: %+v", shardID, err)
|
||||
} else {
|
||||
log.Infof("Lease info for shard: %s has been removed.", shardID)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (checkpointer *DynamoCheckpoint) createTable() error {
|
||||
input := &dynamodb.CreateTableInput{
|
||||
AttributeDefinitions: []*dynamodb.AttributeDefinition{
|
||||
|
|
@ -292,3 +306,29 @@ func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynam
|
|||
})
|
||||
return item.Item, err
|
||||
}
|
||||
|
||||
func (checkpointer *DynamoCheckpoint) removeItem(shardID string) error {
|
||||
var item *dynamodb.DeleteItemOutput
|
||||
err := try.Do(func(attempt int) (bool, error) {
|
||||
var err error
|
||||
item, err = checkpointer.svc.DeleteItem(&dynamodb.DeleteItemInput{
|
||||
TableName: aws.String(checkpointer.TableName),
|
||||
Key: map[string]*dynamodb.AttributeValue{
|
||||
LEASE_KEY_KEY: {
|
||||
S: aws.String(shardID),
|
||||
},
|
||||
},
|
||||
})
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException ||
|
||||
awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
|
||||
attempt < checkpointer.Retries {
|
||||
// Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
|
||||
time.Sleep(time.Duration(2^attempt*100) * time.Millisecond)
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
return false, err
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -320,7 +320,11 @@ func (w *Worker) syncShard() error {
|
|||
for _, shard := range w.shardStatus {
|
||||
// The cached shard no longer existed, remove it.
|
||||
if _, ok := shardInfo[shard.ID]; !ok {
|
||||
// remove the shard from local status cache
|
||||
delete(w.shardStatus, shard.ID)
|
||||
// remove the shard entry in dynamoDB as well
|
||||
// Note: syncShard runs periodically. we don't need to do anything in case of error here.
|
||||
w.checkpointer.RemoveLeaseInfo(shard.ID)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue