From f3eb53a703da553d5e4ca443f82b1716a9db3b29 Mon Sep 17 00:00:00 2001 From: kperry Date: Mon, 20 May 2019 14:47:55 -0500 Subject: [PATCH] Added condition expressions to ddb for updates and puts. rearranged some of the code in consumergroup to accomodate conditional expressions. Moved the LeaseUpdate from consumergroup to ddb.go - it is only needed by ddb.go, and it is ddb.go specific. Added some comments. --- consumergroup.go | 75 +++++++++++++++++++++++--------------------- consumergroup/ddb.go | 67 ++++++++++++++++++++++++++++++++------- kinesis.go | 2 ++ 3 files changed, 98 insertions(+), 46 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index b6a7027..f03956c 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -9,13 +9,27 @@ import ( "github.com/twinj/uuid" ) +// TODO change logging to actual logger + // Lease is data for handling a lease/lock on a particular shard type Lease struct { - LeaseKey string `json:"leaseKey"` // This is the partitionKey in dynamo - Checkpoint string `json:"checkpoint"` // the most updated sequenceNumber from kinesis - LeaseCounter int `json:"leaseCounter"` - LeaseOwner string `json:"leaseOwner"` - HeartbeatID string `json:"heartbeatID"` + // LeaseKey is the partition/primaryKey in storage and is the shardID + LeaseKey string `json:"leaseKey"` + + // Checkpoint the most updated sequenceNumber from kinesis + Checkpoint string `json:"checkpoint"` + + // LeaseCounter will be updated any time a lease changes owners + LeaseCounter int `json:"leaseCounter"` + + // LeaseOwner is the client id (defaulted to a guid) + LeaseOwner string `json:"leaseOwner"` + + // HeartbeatID is a guid that gets updated on every heartbeat. It is used to help determine if a lease is expired. + // If a lease's heartbeatID hasn't been updated within the lease duration, then we assume the lease is expired + HeartbeatID string `json:"heartbeatID"` + + // LastUpdateTime is the last time the lease has changed. Purposely not stored in storage. It is used with LastUpdateTime time.Time `json:"-"` // purposely left out of json so it doesn't get stored in dynamo } @@ -27,23 +41,13 @@ func (lease Lease) IsExpired(maxLeaseDuration time.Duration) bool { return true } } - return false } -// LeaseUpdate is a single entry from either journal - subscription or entitlement - with some state information -type LeaseUpdate struct { - Checkpoint string `json:":cp"` - LeaseCounter int `json:":lc"` - LeaseOwner string `json:":lo"` - HeartbeatID string `json:":hb"` - LastUpdateTime time.Time `json:"-"` -} - // CheckpointStorage is a simple interface for abstracting away the storage functions type CheckpointStorage interface { CreateLease(lease Lease) error - UpdateLease(leaseKey string, leaseUpdate LeaseUpdate) error + UpdateLease(originalLease, updatedLease Lease) error GetLease(leaseKey string) (*Lease, error) GetAllLeases() (map[string]Lease, error) } @@ -91,7 +95,7 @@ func NewConsumerGroupCheckpoint( } // Start is a blocking call that will attempt to acquire a lease on every tick of leaseDuration -// If a lease is successfully acquired it will be returned otherwise it will continue to retry +// If a lease is successfully acquired it will be added to the channel otherwise it will continue to retry func (cgc ConsumerGroupCheckpoint) Start(ctx context.Context, shardc chan string) { fmt.Printf("Starting ConsumerGroupCheckpoint for Consumer %s \n", cgc.OwnerID) @@ -166,21 +170,22 @@ func (cgc ConsumerGroupCheckpoint) CreateOrGetExpiredLease(currentLeases map[str for _, lease := range currentLeases { // TODO add some nil checking if currentLeases[lease.LeaseKey].HeartbeatID == previousLeases[lease.LeaseKey].HeartbeatID { //we assume the lease was not updated during the amount of time - lease.LeaseCounter = lease.LeaseCounter + 1 //update lease counter - if err := cgc.Storage.UpdateLease(lease.LeaseKey, LeaseUpdate{ + updatedLease := Lease{ + LeaseKey: lease.LeaseKey, Checkpoint: lease.Checkpoint, - LeaseCounter: lease.LeaseCounter, + LeaseCounter: lease.LeaseCounter + 1, LeaseOwner: cgc.OwnerID, HeartbeatID: uuid.NewV4().String(), LastUpdateTime: time.Now(), - }); err != nil { + } + if err := cgc.Storage.UpdateLease(lease, updatedLease); err != nil { fmt.Printf("Error is happening updating the lease") } else { if isLeaseInvalidOrChanged(cgc, lease) { return nil //should not be a valid lease at this point } fmt.Printf("Successfully Acquired Expired lease %v\n", lease) - currentLease = &lease //successfully acquired the lease + currentLease = &updatedLease //successfully acquired the lease break } } @@ -189,8 +194,10 @@ func (cgc ConsumerGroupCheckpoint) CreateOrGetExpiredLease(currentLeases map[str return currentLease } -// heartbeatLoop - this should constantly update the lease that is provided +// heartbeatLoop should constantly update the lease that is provided func (cgc ConsumerGroupCheckpoint) heartbeatLoop(lease *Lease) { + cgc.Mutex.Lock() + defer cgc.Mutex.Unlock() fmt.Println("Starting heartbeat loop") ticker := time.NewTicker(cgc.HeartBeatDuration) defer ticker.Stop() @@ -198,23 +205,21 @@ func (cgc ConsumerGroupCheckpoint) heartbeatLoop(lease *Lease) { for { select { case <-ticker.C: - //TODO also check to see if the lease is expired - if !isLeaseInvalidOrChanged(cgc, *lease) { - //TODO remove the lease from the consumer group checklist + if isLeaseInvalidOrChanged(cgc, *lease) || lease.IsExpired(cgc.LeaseDuration) { + delete(cgc.currentLeases, lease.LeaseKey) } - // TODO handle error - heartbeatID := uuid.NewV4().String() - updateTime := time.Now() - cgc.Storage.UpdateLease(lease.LeaseKey, LeaseUpdate{ + updatedLease := Lease{ + LeaseKey: lease.LeaseKey, Checkpoint: lease.Checkpoint, LeaseCounter: lease.LeaseCounter, LeaseOwner: lease.LeaseOwner, - HeartbeatID: heartbeatID, - LastUpdateTime: updateTime, - }) - lease.HeartbeatID = heartbeatID - lease.LastUpdateTime = updateTime + HeartbeatID: uuid.NewV4().String(), + LastUpdateTime: time.Now(), + } + // TODO handle error + cgc.Storage.UpdateLease(*lease, updatedLease) + lease = &updatedLease fmt.Printf("Sucessfully updated lease %v\n", lease) case <-cgc.done: return diff --git a/consumergroup/ddb.go b/consumergroup/ddb.go index 84633de..593ef96 100644 --- a/consumergroup/ddb.go +++ b/consumergroup/ddb.go @@ -1,9 +1,12 @@ package consumergroup import ( + "time" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" + "github.com/aws/aws-sdk-go/service/dynamodb/expression" consumer "github.com/harlow/kinesis-consumer" ) @@ -22,32 +25,62 @@ type DynamoStorage struct { tableName string } +// LeaseUpdate is a simple structure for mapping a lease to an "UpdateItem" +type LeaseUpdate struct { + Checkpoint string `json:":cp"` + LeaseCounter int `json:":lc"` + LeaseOwner string `json:":lo"` + HeartbeatID string `json:":hb"` + LastUpdateTime time.Time `json:"-"` +} + // CreateLease - stores the lease in dynamo func (dynamoClient DynamoStorage) CreateLease(lease consumer.Lease) error { + condition := expression.AttributeNotExists( + expression.Name("leaseKey"), + ) + expr, err := expression.NewBuilder().WithCondition(condition).Build() + if err != nil { + return err + } + av, err := dynamodbattribute.MarshalMap(lease) if err != nil { return err } - //TODO add conditional expression input := &dynamodb.PutItemInput{ - Item: av, - TableName: aws.String(dynamoClient.tableName), + Item: av, + TableName: aws.String(dynamoClient.tableName), + ConditionExpression: expr.Condition(), } if _, err := dynamoClient.Db.PutItem(input); err != nil { + // TODO need to handle ErrCodeConditionalCheckFailedException and repackage error as known error to client return err } return nil } -// TODO add conditional expressions -// UpdateLease - updates the lease in dynamo -func (dynamoClient DynamoStorage) UpdateLease(leaseKey string, leaseUpdate consumer.LeaseUpdate) error { +// UpdateLease updates the lease in dynamo +func (dynamoClient DynamoStorage) UpdateLease(originalLease, updatedLease consumer.Lease) error { - key := mapShardIdToKey(leaseKey) + condition := expression.And( + expression.Equal(expression.Name("leaseKey"), expression.Value(originalLease.LeaseKey)), + expression.Equal(expression.Name("checkpoint"), expression.Value(originalLease.Checkpoint)), + expression.Equal(expression.Name("leaseCounter"), expression.Value(originalLease.LeaseCounter)), + expression.Equal(expression.Name("leaseOwner"), expression.Value(originalLease.LeaseOwner)), + expression.Equal(expression.Name("heartbeatID"), expression.Value(originalLease.HeartbeatID)), + ) + expr, err := expression.NewBuilder().WithCondition(condition).Build() + if err != nil { + return err + } + + key := mapLeaseKeyToDdbKey(updatedLease.LeaseKey) + leaseUpdate := mapLeaseToLeaseUpdate(updatedLease) update, err := dynamodbattribute.MarshalMap(leaseUpdate) if err != nil { return err @@ -59,21 +92,33 @@ func (dynamoClient DynamoStorage) UpdateLease(leaseKey string, leaseUpdate consu ReturnValues: aws.String("UPDATED_NEW"), TableName: aws.String(dynamoClient.tableName), UpdateExpression: aws.String("set checkpoint = :cp, leaseCounter= :lc, leaseOwner= :lo, heartbeatID= :hb"), + ConditionExpression: expr.Condition(), } if _, err := dynamoClient.Db.UpdateItem(input); err != nil { + // TODO need to handle ErrCodeConditionalCheckFailedException and repackage error as known error to client return err } return nil } +func mapLeaseToLeaseUpdate(lease consumer.Lease) LeaseUpdate { + return LeaseUpdate{ + Checkpoint: lease.Checkpoint, + LeaseCounter: lease.LeaseCounter, + LeaseOwner: lease.LeaseOwner, + HeartbeatID: lease.HeartbeatID, + LastUpdateTime: lease.LastUpdateTime, + } +} + // GetLease returns the latest stored records sorted by clockID in descending order // It is assumed that we won't be keeping many records per ID otherwise, this may need to be optimized // later (possibly to use a map) -func (dynamoClient DynamoStorage) GetLease(shardID string) (*consumer.Lease, error) { +func (dynamoClient DynamoStorage) GetLease(leaseKey string) (*consumer.Lease, error) { - key := mapShardIdToKey(shardID) + key := mapLeaseKeyToDdbKey(leaseKey) input := &dynamodb.GetItemInput{ Key: key, TableName: aws.String(dynamoClient.tableName), @@ -92,9 +137,9 @@ func (dynamoClient DynamoStorage) GetLease(shardID string) (*consumer.Lease, err return &lease, nil } -func mapShardIdToKey(shardID string) map[string]*dynamodb.AttributeValue { +func mapLeaseKeyToDdbKey(leaseKey string) map[string]*dynamodb.AttributeValue { return map[string]*dynamodb.AttributeValue{ - "leaseKey": {S: aws.String(shardID)}, + "leaseKey": {S: aws.String(leaseKey)}, } } diff --git a/kinesis.go b/kinesis.go index 5e141ae..0dcf0d8 100644 --- a/kinesis.go +++ b/kinesis.go @@ -13,12 +13,14 @@ type KinesisClient interface { ListShards(*kinesis.ListShardsInput) (*kinesis.ListShardsOutput, error) } +// Kinesis is a convenience struct that includes streamname and client type Kinesis struct { client KinesisClient streamName string } // ListAllShards pulls a list of shard IDs from the kinesis api +// this could also be used by broker.go or any other future "group" implementation that needs to get the shards. func (k Kinesis) ListAllShards() ([]string, error) { var ss []string var listShardsInput = &kinesis.ListShardsInput{