kinesis-consumer/kinesis.go
kperry f3eb53a703 Added condition expressions to ddb for updates and puts. rearranged some of the code in consumergroup to accomodate conditional expressions.
Moved the LeaseUpdate from consumergroup to ddb.go - it is only needed by ddb.go, and it is ddb.go specific.
Added some comments.
2019-05-20 14:47:55 -05:00

48 lines
1.2 KiB
Go

package consumer
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
// KinesisClient is a minimal interface for kinesis
// eventually we should add the other methods we use for kinesis
type KinesisClient interface {
ListShards(*kinesis.ListShardsInput) (*kinesis.ListShardsOutput, error)
}
// Kinesis is a convenience struct that includes streamname and client
type Kinesis struct {
client KinesisClient
streamName string
}
// ListAllShards pulls a list of shard IDs from the kinesis api
// this could also be used by broker.go or any other future "group" implementation that needs to get the shards.
func (k Kinesis) ListAllShards() ([]string, error) {
var ss []string
var listShardsInput = &kinesis.ListShardsInput{
StreamName: aws.String(k.streamName),
}
for {
resp, err := k.client.ListShards(listShardsInput)
if err != nil {
return nil, fmt.Errorf("ListAllShards error: %v", err)
}
for _, shard := range resp.Shards {
ss = append(ss, aws.StringValue(shard.ShardId))
}
if resp.NextToken == nil {
return ss, nil
}
listShardsInput = &kinesis.ListShardsInput{
NextToken: resp.NextToken,
StreamName: aws.String(k.streamName),
}
}
}