diff --git a/consumergroup.go b/consumergroup.go index 5c26221..a4673c4 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -2,98 +2,57 @@ package consumer import ( "context" - "errors" "fmt" "sync" "time" "github.com/twinj/uuid" + + "github.com/harlow/kinesis-consumer/storage" ) // TODO change logging to actual logger -// StorageCouldNotUpdateOrCreateLease is a simple error for handling races that are lost in storage -var StorageCouldNotUpdateOrCreateLease = errors.New("storage could not update or create lease") - -// Lease is data for handling a lease/lock on a particular shard -type Lease struct { - // LeaseKey is the partition/primaryKey in storage and is the shardID - LeaseKey string `json:"leaseKey"` - - // Checkpoint the most updated sequenceNumber from kinesis - Checkpoint string `json:"checkpoint"` - - // LeaseCounter will be updated any time a lease changes owners - LeaseCounter int `json:"leaseCounter"` - - // LeaseOwner is the client id (defaulted to a guid) - LeaseOwner string `json:"leaseOwner"` - - // HeartbeatID is a guid that gets updated on every heartbeat. It is used to help determine if a lease is expired. - // If a lease's heartbeatID hasn't been updated within the lease duration, then we assume the lease is expired - HeartbeatID string `json:"heartbeatID"` - - // LastUpdateTime is the last time the lease has changed. Purposely not stored in storage. It is used with - LastUpdateTime time.Time `json:"-"` // purposely left out of json so it doesn't get stored in dynamo -} - -// IsExpired is a function to check if the lease is expired, but is only expected to be used in the heartbeat loop -func (lease Lease) IsExpired(maxLeaseDuration time.Duration) bool { - if !lease.LastUpdateTime.IsZero() { - durationPast := time.Since(lease.LastUpdateTime) - if durationPast > maxLeaseDuration { - return true - } - } - return false -} - -// CheckpointStorage is a simple interface for abstracting away the storage functions -type CheckpointStorage interface { - CreateLease(lease Lease) error - UpdateLease(originalLease, updatedLease Lease) error - GetLease(leaseKey string) (*Lease, error) - GetAllLeases() (map[string]Lease, error) -} - -// ConsumerGroupCheckpoint is a simple struct for managing the +// ConsumerGroupCheckpoint is a simple struct for managing the consumergroup and heartbeat of updating leases type ConsumerGroupCheckpoint struct { - Storage CheckpointStorage - kinesis Kinesis - LeaseDuration time.Duration HeartBeatDuration time.Duration + LeaseDuration time.Duration OwnerID string + Storage Storage done chan struct{} - currentLeases map[string]*Lease //Initially, this will only be one - Mutex *sync.Mutex + kinesis Kinesis + + leasesMutex *sync.Mutex + leases map[string]*storage.Lease // Initially, this will only be one } func (cgc ConsumerGroupCheckpoint) Get(shardID string) (string, error) { - return cgc.currentLeases[shardID].Checkpoint, nil + return cgc.leases[shardID].Checkpoint, nil } func (cgc ConsumerGroupCheckpoint) Set(shardID, sequenceNumber string) error { - cgc.Mutex.Lock() - defer cgc.Mutex.Unlock() + cgc.leasesMutex.Lock() + defer cgc.leasesMutex.Unlock() - cgc.currentLeases[shardID].Checkpoint = sequenceNumber + cgc.leases[shardID].Checkpoint = sequenceNumber return nil } func NewConsumerGroupCheckpoint( - storage CheckpointStorage, + Storage Storage, kinesis Kinesis, leaseDuration time.Duration, heartBeatDuration time.Duration) *ConsumerGroupCheckpoint { return &ConsumerGroupCheckpoint{ - Storage: storage, - kinesis: kinesis, - LeaseDuration: leaseDuration, HeartBeatDuration: heartBeatDuration, + LeaseDuration: leaseDuration, OwnerID: uuid.NewV4().String(), // generated owner id + Storage: Storage, done: make(chan struct{}), - currentLeases: make(map[string]*Lease, 1), - Mutex: &sync.Mutex{}, + kinesis: kinesis, + + leasesMutex: &sync.Mutex{}, + leases: make(map[string]*storage.Lease, 1), } } @@ -105,37 +64,42 @@ func (cgc ConsumerGroupCheckpoint) Start(ctx context.Context, shardc chan string tick := time.NewTicker(cgc.LeaseDuration) defer tick.Stop() - var currentLeases map[string]Lease - var previousLeases map[string]Lease + var currentLeases map[string]storage.Lease + var previousLeases map[string]storage.Lease for { select { case <-tick.C: - if len(cgc.currentLeases) == 0 { // only do anything if there are no current leases - fmt.Printf("Attempting to acquire lease for OwnerID=%s\n", cgc.OwnerID) - var err error - currentLeases, err = cgc.Storage.GetAllLeases() - if err != nil { - // TODO log this error - } - - lease := cgc.CreateOrGetExpiredLease(currentLeases, previousLeases) - if lease != nil && lease.LeaseKey != "" { - cgc.currentLeases[lease.LeaseKey] = lease - go cgc.heartbeatLoop(lease) - shardc <- lease.LeaseKey - } - previousLeases = currentLeases + if len(cgc.leases) > 0 { // only do anything if there are no current leases + continue } + fmt.Printf("Attempting to acquire lease for OwnerID=%s\n", cgc.OwnerID) + var err error + currentLeases, err = cgc.Storage.GetAllLeases() + if err != nil { + // TODO log this error + } + + lease := cgc.CreateOrGetExpiredLease(currentLeases, previousLeases) + previousLeases = currentLeases + + if lease == nil || lease.LeaseKey == "" { + continue // lease wasn't acquired continue + } + // lease sucessfully acquired + // start the heartbeat and send back the shardID on the channel + cgc.leases[lease.LeaseKey] = lease + go cgc.heartbeatLoop(lease) + shardc <- lease.LeaseKey } } } // CreateOrGetExpiredLease is a helper function that tries checks to see if there are any leases available if not it tries to grab an "expired" lease where the heartbeat isn't updated. -func (cgc ConsumerGroupCheckpoint) CreateOrGetExpiredLease(currentLeases map[string]Lease, previousLeases map[string]Lease) *Lease { - cgc.Mutex.Lock() - defer cgc.Mutex.Unlock() +func (cgc ConsumerGroupCheckpoint) CreateOrGetExpiredLease(currentLeases map[string]storage.Lease, previousLeases map[string]storage.Lease) *storage.Lease { + cgc.leasesMutex.Lock() + defer cgc.leasesMutex.Unlock() listOfShards, err := cgc.kinesis.ListAllShards() if err != nil { @@ -144,11 +108,11 @@ func (cgc ConsumerGroupCheckpoint) CreateOrGetExpiredLease(currentLeases map[str } shardIDsNotYetTaken := getShardIDsNotLeased(listOfShards, currentLeases) - var currentLease *Lease + var currentLease *storage.Lease if len(shardIDsNotYetTaken) > 0 { fmt.Println("Grabbing lease from shardIDs not taken") shardId := shardIDsNotYetTaken[0] //grab the first one //TODO randomize - tempLease := Lease{ + tempLease := storage.Lease{ LeaseKey: shardId, Checkpoint: "0", // we don't have this yet LeaseCounter: 1, @@ -174,7 +138,7 @@ func (cgc ConsumerGroupCheckpoint) CreateOrGetExpiredLease(currentLeases map[str for _, lease := range currentLeases { // TODO add some nil checking if currentLeases[lease.LeaseKey].HeartbeatID == previousLeases[lease.LeaseKey].HeartbeatID { //we assume the lease was not updated during the amount of time - updatedLease := Lease{ + updatedLease := storage.Lease{ LeaseKey: lease.LeaseKey, Checkpoint: lease.Checkpoint, LeaseCounter: lease.LeaseCounter + 1, @@ -199,9 +163,9 @@ func (cgc ConsumerGroupCheckpoint) CreateOrGetExpiredLease(currentLeases map[str } // heartbeatLoop should constantly update the lease that is provided -func (cgc ConsumerGroupCheckpoint) heartbeatLoop(lease *Lease) { - cgc.Mutex.Lock() - defer cgc.Mutex.Unlock() +func (cgc ConsumerGroupCheckpoint) heartbeatLoop(lease *storage.Lease) { + cgc.leasesMutex.Lock() + defer cgc.leasesMutex.Unlock() fmt.Println("Starting heartbeat loop") ticker := time.NewTicker(cgc.HeartBeatDuration) defer ticker.Stop() @@ -211,9 +175,9 @@ func (cgc ConsumerGroupCheckpoint) heartbeatLoop(lease *Lease) { case <-ticker.C: if isLeaseInvalidOrChanged(cgc, *lease) || lease.IsExpired(cgc.LeaseDuration) { - delete(cgc.currentLeases, lease.LeaseKey) + delete(cgc.leases, lease.LeaseKey) } - updatedLease := Lease{ + updatedLease := storage.Lease{ LeaseKey: lease.LeaseKey, Checkpoint: lease.Checkpoint, LeaseCounter: lease.LeaseCounter, @@ -232,7 +196,7 @@ func (cgc ConsumerGroupCheckpoint) heartbeatLoop(lease *Lease) { } // isLeaseInvalidOrChanged checks to see if the lease changed -func isLeaseInvalidOrChanged(cgc ConsumerGroupCheckpoint, lease Lease) bool { +func isLeaseInvalidOrChanged(cgc ConsumerGroupCheckpoint, lease storage.Lease) bool { leaseCurrent, _ := cgc.Storage.GetLease(lease.LeaseKey) if lease.LeaseKey != leaseCurrent.LeaseKey || cgc.OwnerID != leaseCurrent.LeaseOwner || leaseCurrent.LeaseCounter != lease.LeaseCounter { fmt.Printf("The lease changed\n") @@ -242,7 +206,7 @@ func isLeaseInvalidOrChanged(cgc ConsumerGroupCheckpoint, lease Lease) bool { } // getShardIDsNotLeased finds any open shards where there are no leases yet created -func getShardIDsNotLeased(shardIDs []string, leases map[string]Lease) []string { +func getShardIDsNotLeased(shardIDs []string, leases map[string]storage.Lease) []string { var shardIDsNotUsed []string for _, shardID := range shardIDs { if _, ok := leases[shardID]; !ok { diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..b041a84 --- /dev/null +++ b/storage.go @@ -0,0 +1,11 @@ +package consumer + +import "github.com/harlow/kinesis-consumer/storage" + +// Storage is a simple interface for abstracting away the storage functions +type Storage interface { + CreateLease(lease storage.Lease) error + UpdateLease(originalLease, updatedLease storage.Lease) error + GetLease(leaseKey string) (*storage.Lease, error) + GetAllLeases() (map[string]storage.Lease, error) +} diff --git a/consumergroup/ddb.go b/storage/ddb/ddb.go similarity index 88% rename from consumergroup/ddb.go rename to storage/ddb/ddb.go index f98de01..7932726 100644 --- a/consumergroup/ddb.go +++ b/storage/ddb/ddb.go @@ -1,4 +1,4 @@ -package consumergroup +package ddb import ( "time" @@ -9,7 +9,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" "github.com/aws/aws-sdk-go/service/dynamodb/expression" - consumer "github.com/harlow/kinesis-consumer" + "github.com/harlow/kinesis-consumer/storage" ) // DynamoDb simple and minimal interface for DynamoDb that helps with testing @@ -36,7 +36,7 @@ type LeaseUpdate struct { } // CreateLease - stores the lease in dynamo -func (dynamoClient DynamoStorage) CreateLease(lease consumer.Lease) error { +func (dynamoClient DynamoStorage) CreateLease(lease storage.Lease) error { condition := expression.AttributeNotExists( expression.Name("leaseKey"), @@ -60,7 +60,7 @@ func (dynamoClient DynamoStorage) CreateLease(lease consumer.Lease) error { if _, err := dynamoClient.Db.PutItem(input); err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException { - return consumer.StorageCouldNotUpdateOrCreateLease + return storage.StorageCouldNotUpdateOrCreateLease } } return err @@ -70,7 +70,7 @@ func (dynamoClient DynamoStorage) CreateLease(lease consumer.Lease) error { } // UpdateLease updates the lease in dynamo -func (dynamoClient DynamoStorage) UpdateLease(originalLease, updatedLease consumer.Lease) error { +func (dynamoClient DynamoStorage) UpdateLease(originalLease, updatedLease storage.Lease) error { condition := expression.And( expression.Equal(expression.Name("leaseKey"), expression.Value(originalLease.LeaseKey)), @@ -103,7 +103,7 @@ func (dynamoClient DynamoStorage) UpdateLease(originalLease, updatedLease consum if _, err := dynamoClient.Db.UpdateItem(input); err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException { - return consumer.StorageCouldNotUpdateOrCreateLease + return storage.StorageCouldNotUpdateOrCreateLease } } return err @@ -112,7 +112,7 @@ func (dynamoClient DynamoStorage) UpdateLease(originalLease, updatedLease consum return nil } -func mapLeaseToLeaseUpdate(lease consumer.Lease) LeaseUpdate { +func mapLeaseToLeaseUpdate(lease storage.Lease) LeaseUpdate { return LeaseUpdate{ Checkpoint: lease.Checkpoint, LeaseCounter: lease.LeaseCounter, @@ -125,7 +125,7 @@ func mapLeaseToLeaseUpdate(lease consumer.Lease) LeaseUpdate { // GetLease returns the latest stored records sorted by clockID in descending order // It is assumed that we won't be keeping many records per ID otherwise, this may need to be optimized // later (possibly to use a map) -func (dynamoClient DynamoStorage) GetLease(leaseKey string) (*consumer.Lease, error) { +func (dynamoClient DynamoStorage) GetLease(leaseKey string) (*storage.Lease, error) { key := mapLeaseKeyToDdbKey(leaseKey) input := &dynamodb.GetItemInput{ @@ -138,7 +138,7 @@ func (dynamoClient DynamoStorage) GetLease(leaseKey string) (*consumer.Lease, er return nil, err } - var lease consumer.Lease + var lease storage.Lease if err := dynamodbattribute.UnmarshalMap(result.Item, &lease); err != nil { return nil, err } @@ -153,7 +153,7 @@ func mapLeaseKeyToDdbKey(leaseKey string) map[string]*dynamodb.AttributeValue { } // GetAllLeases this can be used at start up (or anytime to grab all the leases) -func (dynamoClient DynamoStorage) GetAllLeases() (map[string]consumer.Lease, error) { +func (dynamoClient DynamoStorage) GetAllLeases() (map[string]storage.Lease, error) { // TODO if we have a lot of shards, we might have to worry about limits here input := &dynamodb.ScanInput{ @@ -165,9 +165,9 @@ func (dynamoClient DynamoStorage) GetAllLeases() (map[string]consumer.Lease, err return nil, err } - leases := make(map[string]consumer.Lease, len(result.Items)) + leases := make(map[string]storage.Lease, len(result.Items)) for _, item := range result.Items { - var record consumer.Lease + var record storage.Lease if err := dynamodbattribute.UnmarshalMap(item, &record); err != nil { return nil, err } diff --git a/consumergroup/ddb_test.go b/storage/ddb/ddb_test.go similarity index 95% rename from consumergroup/ddb_test.go rename to storage/ddb/ddb_test.go index 0b39987..7b5cc1d 100644 --- a/consumergroup/ddb_test.go +++ b/storage/ddb/ddb_test.go @@ -1,4 +1,4 @@ -package consumergroup +package ddb import ( "reflect" @@ -10,10 +10,10 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/stretchr/testify/mock" - consumer "github.com/harlow/kinesis-consumer" + "github.com/harlow/kinesis-consumer/storage" ) -var testLease1 = consumer.Lease{ +var testLease1 = storage.Lease{ LeaseKey: "000001", Checkpoint: "1234345", LeaseCounter: 0, @@ -63,7 +63,7 @@ func TestDynamoStorage_CreateLease(t *testing.T) { tests := []struct { name string tableName string - lease consumer.Lease + lease storage.Lease dynamoErr error expectedErr error }{ @@ -79,7 +79,7 @@ func TestDynamoStorage_CreateLease(t *testing.T) { tableName: "test", lease: testLease1, dynamoErr: awserr.New(dynamodb.ErrCodeConditionalCheckFailedException, "", nil), - expectedErr: consumer.StorageCouldNotUpdateOrCreateLease, + expectedErr: storage.StorageCouldNotUpdateOrCreateLease, }, { name: "CreateLease_Expect_Error_ErrCodeInternalServerError", @@ -113,8 +113,8 @@ func TestDynamoStorage_UpdateLease(t *testing.T) { tableName string } type args struct { - originalLease consumer.Lease - updatedLease consumer.Lease + originalLease storage.Lease + updatedLease storage.Lease } tests := []struct { name string @@ -141,7 +141,7 @@ func TestDynamoStorage_UpdateLease(t *testing.T) { func Test_mapLeaseToLeaseUpdate(t *testing.T) { type args struct { - lease consumer.Lease + lease storage.Lease } tests := []struct { name string @@ -176,7 +176,7 @@ func TestDynamoStorage_GetLease(t *testing.T) { name string tableName string leaseKey string - want *consumer.Lease + want *storage.Lease dynamoOut *dynamodb.GetItemOutput dynamoErr error wantErr bool @@ -239,7 +239,7 @@ func TestDynamoStorage_GetAllLeases(t *testing.T) { tests := []struct { name string fields fields - want map[string]consumer.Lease + want map[string]storage.Lease wantErr bool }{ // TODO: Add test cases. diff --git a/storage/lease.go b/storage/lease.go new file mode 100644 index 0000000..7bc36a7 --- /dev/null +++ b/storage/lease.go @@ -0,0 +1,42 @@ +package storage + +import ( + "errors" + "time" +) + +// Lease is data for handling a lease/lock on a particular shard +type Lease struct { + // LeaseKey is the partition/primaryKey in storage and is the shardID + LeaseKey string `json:"leaseKey"` + + // Checkpoint the most updated sequenceNumber from kinesis + Checkpoint string `json:"checkpoint"` + + // LeaseCounter will be updated any time a lease changes owners + LeaseCounter int `json:"leaseCounter"` + + // LeaseOwner is the client id (defaulted to a guid) + LeaseOwner string `json:"leaseOwner"` + + // HeartbeatID is a guid that gets updated on every heartbeat. It is used to help determine if a lease is expired. + // If a lease's heartbeatID hasn't been updated within the lease duration, then we assume the lease is expired + HeartbeatID string `json:"heartbeatID"` + + // LastUpdateTime is the last time the lease has changed. Purposely not stored in storage. It is used with + LastUpdateTime time.Time `json:"-"` // purposely left out of json so it doesn't get stored in dynamo +} + +// IsExpired is a function to check if the lease is expired, but is only expected to be used in the heartbeat loop +func (lease Lease) IsExpired(maxLeaseDuration time.Duration) bool { + if !lease.LastUpdateTime.IsZero() { + durationPast := time.Since(lease.LastUpdateTime) + if durationPast > maxLeaseDuration { + return true + } + } + return false +} + +// StorageCouldNotUpdateOrCreateLease is a simple error for handling races that are lost in storage +var StorageCouldNotUpdateOrCreateLease = errors.New("storage could not update or create lease")