From 2542ea141612ecb6df893a6b11b44962fdadc346 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Mon, 23 Apr 2018 12:40:39 -0700 Subject: [PATCH] KCL: Remove lease entry in dynamoDB table when shard no longer exists Need to remove lease entry in dynamodb table when shard has been removed by Kinesis. This happens when doing shard splitting and parent shard will be moved by Kinesis after its retention period (normally after 24 hours). Change-Id: I70a5836436ac0698110085d46d9438fcaf539cd2 --- .../clientlibrary/worker/checkpointer.go | 40 +++++++++++++++++++ .../clientlibrary/worker/worker.go | 4 ++ 2 files changed, 44 insertions(+) diff --git a/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go index ec279b2..3171fc6 100644 --- a/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go @@ -36,6 +36,7 @@ type Checkpointer interface { GetLease(*shardStatus, string) error CheckpointSequence(*shardStatus) error FetchCheckpoint(*shardStatus) error + RemoveLeaseInfo(string) error } // ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found @@ -203,6 +204,19 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error return nil } +// RemoveLeaseInfo to remove lease info for shard entry in dynamoDB because the shard no longer exists in Kinesis +func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error { + err := checkpointer.removeItem(shardID) + + if err != nil { + log.Errorf("Error in removing lease info for shard: %s, Error: %+v", shardID, err) + } else { + log.Infof("Lease info for shard: %s has been removed.", shardID) + } + + return err +} + func (checkpointer *DynamoCheckpoint) createTable() error { input := &dynamodb.CreateTableInput{ AttributeDefinitions: []*dynamodb.AttributeDefinition{ @@ -292,3 +306,29 @@ func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynam }) return item.Item, err } + +func (checkpointer *DynamoCheckpoint) removeItem(shardID string) error { + var item *dynamodb.DeleteItemOutput + err := try.Do(func(attempt int) (bool, error) { + var err error + item, err = checkpointer.svc.DeleteItem(&dynamodb.DeleteItemInput{ + TableName: aws.String(checkpointer.TableName), + Key: map[string]*dynamodb.AttributeValue{ + LEASE_KEY_KEY: { + S: aws.String(shardID), + }, + }, + }) + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException || + awsErr.Code() == dynamodb.ErrCodeInternalServerError && + attempt < checkpointer.Retries { + // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html + time.Sleep(time.Duration(2^attempt*100) * time.Millisecond) + return true, err + } + } + return false, err + }) + return err +} diff --git a/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go index a97573c..d4f9a13 100644 --- a/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go @@ -320,7 +320,11 @@ func (w *Worker) syncShard() error { for _, shard := range w.shardStatus { // The cached shard no longer existed, remove it. if _, ok := shardInfo[shard.ID]; !ok { + // remove the shard from local status cache delete(w.shardStatus, shard.ID) + // remove the shard entry in dynamoDB as well + // Note: syncShard runs periodically. we don't need to do anything in case of error here. + w.checkpointer.RemoveLeaseInfo(shard.ID) } }