diff --git a/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go index ec279b2..3171fc6 100644 --- a/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go @@ -36,6 +36,7 @@ type Checkpointer interface { GetLease(*shardStatus, string) error CheckpointSequence(*shardStatus) error FetchCheckpoint(*shardStatus) error + RemoveLeaseInfo(string) error } // ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found @@ -203,6 +204,19 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error return nil } +// RemoveLeaseInfo to remove lease info for shard entry in dynamoDB because the shard no longer exists in Kinesis +func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error { + err := checkpointer.removeItem(shardID) + + if err != nil { + log.Errorf("Error in removing lease info for shard: %s, Error: %+v", shardID, err) + } else { + log.Infof("Lease info for shard: %s has been removed.", shardID) + } + + return err +} + func (checkpointer *DynamoCheckpoint) createTable() error { input := &dynamodb.CreateTableInput{ AttributeDefinitions: []*dynamodb.AttributeDefinition{ @@ -292,3 +306,29 @@ func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynam }) return item.Item, err } + +func (checkpointer *DynamoCheckpoint) removeItem(shardID string) error { + var item *dynamodb.DeleteItemOutput + err := try.Do(func(attempt int) (bool, error) { + var err error + item, err = checkpointer.svc.DeleteItem(&dynamodb.DeleteItemInput{ + TableName: aws.String(checkpointer.TableName), + Key: map[string]*dynamodb.AttributeValue{ + LEASE_KEY_KEY: { + S: aws.String(shardID), + }, + }, + }) + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException || + awsErr.Code() == dynamodb.ErrCodeInternalServerError && + attempt < checkpointer.Retries { + // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html + time.Sleep(time.Duration(2^attempt*100) * time.Millisecond) + return true, err + } + } + return false, err + }) + return err +} diff --git a/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go index a97573c..d4f9a13 100644 --- a/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go @@ -320,7 +320,11 @@ func (w *Worker) syncShard() error { for _, shard := range w.shardStatus { // The cached shard no longer existed, remove it. if _, ok := shardInfo[shard.ID]; !ok { + // remove the shard from local status cache delete(w.shardStatus, shard.ID) + // remove the shard entry in dynamoDB as well + // Note: syncShard runs periodically. we don't need to do anything in case of error here. + w.checkpointer.RemoveLeaseInfo(shard.ID) } }