From b47b611696441a82310d4f99428f2b4bd8cb6e59 Mon Sep 17 00:00:00 2001 From: kperry Date: Sun, 19 May 2019 20:51:40 -0500 Subject: [PATCH] Initial commit with consumer groups. Still a lot of cleanup and testing to do. --- broker.go | 24 ++--- consumer.go | 19 +++- consumergroup.go | 244 +++++++++++++++++++++++++++++++++++++++++++ consumergroup/ddb.go | 124 ++++++++++++++++++++++ go.mod | 12 ++- go.sum | 56 ++++++++++ kinesis.go | 46 ++++++++ options.go | 8 ++ 8 files changed, 512 insertions(+), 21 deletions(-) create mode 100644 consumergroup.go create mode 100644 consumergroup/ddb.go create mode 100644 kinesis.go diff --git a/broker.go b/broker.go index ecf25a1..5939ab7 100644 --- a/broker.go +++ b/broker.go @@ -14,14 +14,12 @@ import ( func newBroker( client kinesisiface.KinesisAPI, streamName string, - shardc chan *kinesis.Shard, logger Logger, ) *broker { return &broker{ client: client, shards: make(map[string]*kinesis.Shard), streamName: streamName, - shardc: shardc, logger: logger, } } @@ -31,17 +29,15 @@ func newBroker( type broker struct { client kinesisiface.KinesisAPI streamName string - shardc chan *kinesis.Shard logger Logger - - shardMu sync.Mutex - shards map[string]*kinesis.Shard + shardMu sync.Mutex + shards map[string]*kinesis.Shard } -// start is a blocking operation which will loop and attempt to find new +// Start is a blocking operation which will loop and attempt to find new // shards on a regular cadence. -func (b *broker) start(ctx context.Context) { - b.findNewShards() +func (b *broker) Start(ctx context.Context, shardc chan string) { + b.findNewShards(shardc) ticker := time.NewTicker(30 * time.Second) // Note: while ticker is a rather naive approach to this problem, @@ -59,7 +55,7 @@ func (b *broker) start(ctx context.Context) { ticker.Stop() return case <-ticker.C: - b.findNewShards() + b.findNewShards(shardc) } } } @@ -67,7 +63,7 @@ func (b *broker) start(ctx context.Context) { // findNewShards pulls the list of shards from the Kinesis API // and uses a local cache to determine if we are already processing // a particular shard. -func (b *broker) findNewShards() { +func (b *broker) findNewShards(shardc chan string) { b.shardMu.Lock() defer b.shardMu.Unlock() @@ -84,11 +80,11 @@ func (b *broker) findNewShards() { continue } b.shards[*shard.ShardId] = shard - b.shardc <- shard + shardc <- *shard.ShardId } } -// listShards pulls a list of shard IDs from the kinesis api +// ListAllShards pulls a list of shard IDs from the kinesis api func (b *broker) listShards() ([]*kinesis.Shard, error) { var ss []*kinesis.Shard var listShardsInput = &kinesis.ListShardsInput{ @@ -98,7 +94,7 @@ func (b *broker) listShards() ([]*kinesis.Shard, error) { for { resp, err := b.client.ListShards(listShardsInput) if err != nil { - return nil, fmt.Errorf("ListShards error: %v", err) + return nil, fmt.Errorf("ListAllShards error: %v", err) } ss = append(ss, resp.Shards...) diff --git a/consumer.go b/consumer.go index e9c583a..66a70e5 100644 --- a/consumer.go +++ b/consumer.go @@ -16,6 +16,10 @@ import ( // Record is an alias of record returned from kinesis library type Record = kinesis.Record +type Group interface { + Start(ctx context.Context, shardc chan string) +} + // New creates a kinesis consumer with default settings. Use Option to override // any of the optional attributes. func New(streamName string, opts ...Option) (*Consumer, error) { @@ -39,7 +43,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { opt(c) } - // default client if none provided + // default client if None provided if c.client == nil { newSession, err := session.NewSession(aws.NewConfig()) if err != nil { @@ -48,6 +52,11 @@ func New(streamName string, opts ...Option) (*Consumer, error) { c.client = kinesis.New(newSession) } + // default the group if nothing was provided + if c.group == nil { + c.group = newBroker(c.client, streamName, c.logger) + } + return c, nil } @@ -59,6 +68,7 @@ type Consumer struct { logger Logger checkpoint Checkpoint counter Counter + group Group } // ScanFunc is the type of the function called for each message read @@ -80,14 +90,13 @@ var SkipCheckpoint = errors.New("skip checkpoint") func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error { var ( errc = make(chan error, 1) - shardc = make(chan *kinesis.Shard, 1) - broker = newBroker(c.client, c.streamName, shardc, c.logger) + shardc = make(chan string, 1) ) ctx, cancel := context.WithCancel(ctx) defer cancel() - go broker.start(ctx) + go c.group.Start(ctx, shardc) go func() { <-ctx.Done() @@ -106,7 +115,7 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error { // error has already occured } } - }(aws.StringValue(shard.ShardId)) + }(shard) } close(errc) diff --git a/consumergroup.go b/consumergroup.go new file mode 100644 index 0000000..b6a7027 --- /dev/null +++ b/consumergroup.go @@ -0,0 +1,244 @@ +package consumer + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/twinj/uuid" +) + +// Lease is data for handling a lease/lock on a particular shard +type Lease struct { + LeaseKey string `json:"leaseKey"` // This is the partitionKey in dynamo + Checkpoint string `json:"checkpoint"` // the most updated sequenceNumber from kinesis + LeaseCounter int `json:"leaseCounter"` + LeaseOwner string `json:"leaseOwner"` + HeartbeatID string `json:"heartbeatID"` + LastUpdateTime time.Time `json:"-"` // purposely left out of json so it doesn't get stored in dynamo +} + +// IsExpired is a function to check if the lease is expired, but is only expected to be used in the heartbeat loop +func (lease Lease) IsExpired(maxLeaseDuration time.Duration) bool { + if !lease.LastUpdateTime.IsZero() { + durationPast := time.Since(lease.LastUpdateTime) + if durationPast > maxLeaseDuration { + return true + } + } + + return false +} + +// LeaseUpdate is a single entry from either journal - subscription or entitlement - with some state information +type LeaseUpdate struct { + Checkpoint string `json:":cp"` + LeaseCounter int `json:":lc"` + LeaseOwner string `json:":lo"` + HeartbeatID string `json:":hb"` + LastUpdateTime time.Time `json:"-"` +} + +// CheckpointStorage is a simple interface for abstracting away the storage functions +type CheckpointStorage interface { + CreateLease(lease Lease) error + UpdateLease(leaseKey string, leaseUpdate LeaseUpdate) error + GetLease(leaseKey string) (*Lease, error) + GetAllLeases() (map[string]Lease, error) +} + +// ConsumerGroupCheckpoint is a simple struct for managing the +type ConsumerGroupCheckpoint struct { + Storage CheckpointStorage + kinesis Kinesis + LeaseDuration time.Duration + HeartBeatDuration time.Duration + OwnerID string + done chan struct{} + currentLeases map[string]*Lease //Initially, this will only be one + Mutex *sync.Mutex +} + +func (cgc ConsumerGroupCheckpoint) Get(shardID string) (string, error) { + return cgc.currentLeases[shardID].Checkpoint, nil +} + +func (cgc ConsumerGroupCheckpoint) Set(shardID, sequenceNumber string) error { + cgc.Mutex.Lock() + defer cgc.Mutex.Unlock() + + cgc.currentLeases[shardID].Checkpoint = sequenceNumber + return nil +} + +func NewConsumerGroupCheckpoint( + storage CheckpointStorage, + kinesis Kinesis, + leaseDuration time.Duration, + heartBeatDuration time.Duration) *ConsumerGroupCheckpoint { + return &ConsumerGroupCheckpoint{ + Storage: storage, + kinesis: kinesis, + LeaseDuration: leaseDuration, + HeartBeatDuration: heartBeatDuration, + OwnerID: uuid.NewV4().String(), // generated owner id + done: make(chan struct{}), + currentLeases: make(map[string]*Lease, 1), + Mutex: &sync.Mutex{}, + } + +} + +// Start is a blocking call that will attempt to acquire a lease on every tick of leaseDuration +// If a lease is successfully acquired it will be returned otherwise it will continue to retry +func (cgc ConsumerGroupCheckpoint) Start(ctx context.Context, shardc chan string) { + fmt.Printf("Starting ConsumerGroupCheckpoint for Consumer %s \n", cgc.OwnerID) + + tick := time.NewTicker(cgc.LeaseDuration) + defer tick.Stop() + var currentLeases map[string]Lease + var previousLeases map[string]Lease + + for { + select { + case <-tick.C: + if len(cgc.currentLeases) == 0 { // only do anything if there are no current leases + fmt.Printf("Attempting to acquire lease for OwnerID=%s\n", cgc.OwnerID) + var err error + currentLeases, err = cgc.Storage.GetAllLeases() + if err != nil { + // TODO log this error + } + + lease := cgc.CreateOrGetExpiredLease(currentLeases, previousLeases) + if lease != nil && lease.LeaseKey != "" { + cgc.currentLeases[lease.LeaseKey] = lease + go cgc.heartbeatLoop(lease) + shardc <- lease.LeaseKey + } + previousLeases = currentLeases + } + } + } + +} + +// CreateOrGetExpiredLease is a helper function that tries checks to see if there are any leases available if not it tries to grab an "expired" lease where the heartbeat isn't updated. +func (cgc ConsumerGroupCheckpoint) CreateOrGetExpiredLease(currentLeases map[string]Lease, previousLeases map[string]Lease) *Lease { + cgc.Mutex.Lock() + defer cgc.Mutex.Unlock() + + listOfShards, err := cgc.kinesis.ListAllShards() + if err != nil { + //TODO log error + // TODO return error + } + + shardIDsNotYetTaken := getShardIDsNotLeased(listOfShards, currentLeases) + var currentLease *Lease + if len(shardIDsNotYetTaken) > 0 { + fmt.Println("Grabbing lease from shardIDs not taken") + shardId := shardIDsNotYetTaken[0] //grab the first one //TODO randomize + tempLease := Lease{ + LeaseKey: shardId, + Checkpoint: "0", // we don't have this yet + LeaseCounter: 1, + LeaseOwner: cgc.OwnerID, + HeartbeatID: uuid.NewV4().String(), + LastUpdateTime: time.Now(), + } + + if err := cgc.Storage.CreateLease(tempLease); err != nil { + fmt.Printf("Error is happening create the lease") + } else { + //success + if isLeaseInvalidOrChanged(cgc, tempLease) { + //Lease must have been acquired by another worker + return nil + } + fmt.Printf("Successfully Acquired lease %v", tempLease) + currentLease = &tempLease //successfully acquired the lease + } + + } + if currentLease == nil || currentLease.LeaseKey == "" && len(previousLeases) > 0 { + for _, lease := range currentLeases { + // TODO add some nil checking + if currentLeases[lease.LeaseKey].HeartbeatID == previousLeases[lease.LeaseKey].HeartbeatID { //we assume the lease was not updated during the amount of time + lease.LeaseCounter = lease.LeaseCounter + 1 //update lease counter + if err := cgc.Storage.UpdateLease(lease.LeaseKey, LeaseUpdate{ + Checkpoint: lease.Checkpoint, + LeaseCounter: lease.LeaseCounter, + LeaseOwner: cgc.OwnerID, + HeartbeatID: uuid.NewV4().String(), + LastUpdateTime: time.Now(), + }); err != nil { + fmt.Printf("Error is happening updating the lease") + } else { + if isLeaseInvalidOrChanged(cgc, lease) { + return nil //should not be a valid lease at this point + } + fmt.Printf("Successfully Acquired Expired lease %v\n", lease) + currentLease = &lease //successfully acquired the lease + break + } + } + } + } + return currentLease +} + +// heartbeatLoop - this should constantly update the lease that is provided +func (cgc ConsumerGroupCheckpoint) heartbeatLoop(lease *Lease) { + fmt.Println("Starting heartbeat loop") + ticker := time.NewTicker(cgc.HeartBeatDuration) + defer ticker.Stop() + defer close(cgc.done) + for { + select { + case <-ticker.C: + //TODO also check to see if the lease is expired + if !isLeaseInvalidOrChanged(cgc, *lease) { + //TODO remove the lease from the consumer group checklist + + } + // TODO handle error + heartbeatID := uuid.NewV4().String() + updateTime := time.Now() + cgc.Storage.UpdateLease(lease.LeaseKey, LeaseUpdate{ + Checkpoint: lease.Checkpoint, + LeaseCounter: lease.LeaseCounter, + LeaseOwner: lease.LeaseOwner, + HeartbeatID: heartbeatID, + LastUpdateTime: updateTime, + }) + lease.HeartbeatID = heartbeatID + lease.LastUpdateTime = updateTime + fmt.Printf("Sucessfully updated lease %v\n", lease) + case <-cgc.done: + return + } + } +} + +// isLeaseInvalidOrChanged checks to see if the lease changed +func isLeaseInvalidOrChanged(cgc ConsumerGroupCheckpoint, lease Lease) bool { + leaseCurrent, _ := cgc.Storage.GetLease(lease.LeaseKey) + if lease.LeaseKey != leaseCurrent.LeaseKey || cgc.OwnerID != leaseCurrent.LeaseOwner || leaseCurrent.LeaseCounter != lease.LeaseCounter { + fmt.Printf("The lease changed\n") + return true + } + return false +} + +// getShardIDsNotLeased finds any open shards where there are no leases yet created +func getShardIDsNotLeased(shardIDs []string, leases map[string]Lease) []string { + var shardIDsNotUsed []string + for _, shardID := range shardIDs { + if _, ok := leases[shardID]; !ok { + shardIDsNotUsed = append(shardIDsNotUsed, shardID) + } + } + return shardIDsNotUsed +} diff --git a/consumergroup/ddb.go b/consumergroup/ddb.go new file mode 100644 index 0000000..84633de --- /dev/null +++ b/consumergroup/ddb.go @@ -0,0 +1,124 @@ +package consumergroup + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" + + consumer "github.com/harlow/kinesis-consumer" +) + +// DynamoDb simple and minimal interface for DynamoDb that helps with testing +type DynamoDb interface { + PutItem(*dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) + UpdateItem(*dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) + GetItem(*dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) + Scan(*dynamodb.ScanInput) (*dynamodb.ScanOutput, error) +} + +// DynamoStorage struct that implements the storage interface and uses simplified DynamoDb struct +type DynamoStorage struct { + Db DynamoDb + tableName string +} + +// CreateLease - stores the lease in dynamo +func (dynamoClient DynamoStorage) CreateLease(lease consumer.Lease) error { + + av, err := dynamodbattribute.MarshalMap(lease) + if err != nil { + return err + } + + //TODO add conditional expression + input := &dynamodb.PutItemInput{ + Item: av, + TableName: aws.String(dynamoClient.tableName), + } + + if _, err := dynamoClient.Db.PutItem(input); err != nil { + return err + } + + return nil +} + +// TODO add conditional expressions +// UpdateLease - updates the lease in dynamo +func (dynamoClient DynamoStorage) UpdateLease(leaseKey string, leaseUpdate consumer.LeaseUpdate) error { + + key := mapShardIdToKey(leaseKey) + update, err := dynamodbattribute.MarshalMap(leaseUpdate) + if err != nil { + return err + } + + input := &dynamodb.UpdateItemInput{ + ExpressionAttributeValues: update, + Key: key, + ReturnValues: aws.String("UPDATED_NEW"), + TableName: aws.String(dynamoClient.tableName), + UpdateExpression: aws.String("set checkpoint = :cp, leaseCounter= :lc, leaseOwner= :lo, heartbeatID= :hb"), + } + + if _, err := dynamoClient.Db.UpdateItem(input); err != nil { + return err + } + + return nil +} + +// GetLease returns the latest stored records sorted by clockID in descending order +// It is assumed that we won't be keeping many records per ID otherwise, this may need to be optimized +// later (possibly to use a map) +func (dynamoClient DynamoStorage) GetLease(shardID string) (*consumer.Lease, error) { + + key := mapShardIdToKey(shardID) + input := &dynamodb.GetItemInput{ + Key: key, + TableName: aws.String(dynamoClient.tableName), + ConsistentRead: aws.Bool(true), + } + result, err := dynamoClient.Db.GetItem(input) + if err != nil { + return nil, err + } + + var lease consumer.Lease + if err := dynamodbattribute.UnmarshalMap(result.Item, &lease); err != nil { + return nil, err + } + + return &lease, nil +} + +func mapShardIdToKey(shardID string) map[string]*dynamodb.AttributeValue { + return map[string]*dynamodb.AttributeValue{ + "leaseKey": {S: aws.String(shardID)}, + } +} + +// GetAllLeases this can be used at start up (or anytime to grab all the leases) +func (dynamoClient DynamoStorage) GetAllLeases() (map[string]consumer.Lease, error) { + + // TODO if we have a lot of shards, we might have to worry about limits here + input := &dynamodb.ScanInput{ + ConsistentRead: aws.Bool(true), + TableName: aws.String(dynamoClient.tableName), + } + result, err := dynamoClient.Db.Scan(input) + if err != nil { + return nil, err + } + + leases := make(map[string]consumer.Lease, len(result.Items)) + for _, item := range result.Items { + var record consumer.Lease + if err := dynamodbattribute.UnmarshalMap(item, &record); err != nil { + return nil, err + } + leases[record.LeaseKey] = record + } + + return leases, nil +} diff --git a/go.mod b/go.mod index 7ed5fdc..62ff507 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,19 @@ module github.com/harlow/kinesis-consumer require ( github.com/apex/log v1.0.0 github.com/aws/aws-sdk-go v1.15.0 - github.com/go-ini/ini v1.38.1 + github.com/go-ini/ini v1.38.1 // indirect github.com/go-sql-driver/mysql v1.4.1 - github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 + github.com/myesui/uuid v1.0.0 // indirect + github.com/onsi/ginkgo v1.8.0 // indirect + github.com/onsi/gomega v1.5.0 // indirect github.com/pkg/errors v0.8.0 + github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect + github.com/stretchr/testify v1.3.0 // indirect + github.com/twinj/uuid v1.0.0 + golang.org/x/net v0.0.0-20190514140710-3ec191127204 // indirect + google.golang.org/appengine v1.6.0 // indirect gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 + gopkg.in/ini.v1 v1.42.0 // indirect gopkg.in/redis.v5 v5.2.9 ) diff --git a/go.sum b/go.sum index e9300ce..4a974f8 100644 --- a/go.sum +++ b/go.sum @@ -2,18 +2,74 @@ github.com/apex/log v1.0.0 h1:5UWeZC54mWVtOGSCjtuvDPgY/o0QxmjQgvYZ27pLVGQ= github.com/apex/log v1.0.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY= github.com/aws/aws-sdk-go v1.15.0 h1:uxi9gcf4jxEX7r8oWYMEkYB4kziKet+1cHPmq52LjC4= github.com/aws/aws-sdk-go v1.15.0/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ini/ini v1.38.1 h1:hbtfM8emWUVo9GnXSloXYyFbXxZ+tG6sbepSStoe1FY= github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5WuCYnc6RtbfLVAB7nmC5M= github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/myesui/uuid v1.0.0 h1:xCBmH4l5KuvLYc5L7AS7SZg9/jKdIFubM7OVoLqaQUI= +github.com/myesui/uuid v1.0.0/go.mod h1:2CDfNgU0LR8mIdO8vdWd8i9gWWxLlcoIGGpSNgafq84= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= +github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= +github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= +github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/twinj/uuid v1.0.0 h1:fzz7COZnDrXGTAOHGuUGYd6sG+JMq+AoE7+Jlu0przk= +github.com/twinj/uuid v1.0.0/go.mod h1:mMgcE1RHFUFqe5AfiwlINXisXfDGro23fWdPUfOMjRY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190514140710-3ec191127204 h1:4yG6GqBtw9C+UrLp6s2wtSniayy/Vd/3F7ffLE427XI= +golang.org/x/net v0.0.0-20190514140710-3ec191127204/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= +google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 h1:FVCohIoYO7IJoDDVpV2pdq7SgrMH6wHnuTyrdrxJNoY= gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0/go.mod h1:OdE7CF6DbADk7lN8LIKRzRJTTZXIjtWgA5THM5lhBAw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= +gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/redis.v5 v5.2.9 h1:MNZYOLPomQzZMfpN3ZtD1uyJ2IDonTTlxYiV/pEApiw= gopkg.in/redis.v5 v5.2.9/go.mod h1:6gtv0/+A4iM08kdRfocWYB3bLX2tebpNtfKlFT6H4mY= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/kinesis.go b/kinesis.go new file mode 100644 index 0000000..5e141ae --- /dev/null +++ b/kinesis.go @@ -0,0 +1,46 @@ +package consumer + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" +) + +// KinesisClient is a minimal interface for kinesis +// eventually we should add the other methods we use for kinesis +type KinesisClient interface { + ListShards(*kinesis.ListShardsInput) (*kinesis.ListShardsOutput, error) +} + +type Kinesis struct { + client KinesisClient + streamName string +} + +// ListAllShards pulls a list of shard IDs from the kinesis api +func (k Kinesis) ListAllShards() ([]string, error) { + var ss []string + var listShardsInput = &kinesis.ListShardsInput{ + StreamName: aws.String(k.streamName), + } + + for { + resp, err := k.client.ListShards(listShardsInput) + if err != nil { + return nil, fmt.Errorf("ListAllShards error: %v", err) + } + for _, shard := range resp.Shards { + ss = append(ss, aws.StringValue(shard.ShardId)) + } + + if resp.NextToken == nil { + return ss, nil + } + + listShardsInput = &kinesis.ListShardsInput{ + NextToken: resp.NextToken, + StreamName: aws.String(k.streamName), + } + } +} diff --git a/options.go b/options.go index 0876931..d6e6893 100644 --- a/options.go +++ b/options.go @@ -39,3 +39,11 @@ func WithShardIteratorType(t string) Option { c.initialShardIteratorType = t } } + +// WithGroup allows user to pass in ConsumerGroups +func WithGroup(group Group) Option { + return func(c *Consumer) { + c.group = group + } + +}