kinesis-consumer/client.go

144 lines
3.3 KiB
Go
Raw Normal View History

package consumer
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)
// ClientOption is used to override defaults when creating a KinesisClient
type ClientOption func(*KinesisClient)
// WithKinesis overrides the default Kinesis client
func WithKinesis(svc kinesisiface.KinesisAPI) ClientOption {
return func(kc *KinesisClient) {
kc.svc = svc
}
}
2017-11-27 02:22:09 +00:00
// NewKinesisClient returns client to interface with Kinesis stream
func NewKinesisClient(opts ...ClientOption) *KinesisClient {
kc := &KinesisClient{}
for _, opt := range opts {
opt(kc)
}
if kc.svc == nil {
kc.svc = kinesis.New(session.New(aws.NewConfig()))
}
return kc
}
// KinesisClient acts as wrapper around Kinesis client
2017-11-27 02:16:32 +00:00
type KinesisClient struct {
svc kinesisiface.KinesisAPI
}
// GetShardIDs returns shard ids in a given stream
2017-11-27 02:16:32 +00:00
func (c *KinesisClient) GetShardIDs(streamName string) ([]string, error) {
resp, err := c.svc.DescribeStream(
&kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
},
)
if err != nil {
return nil, fmt.Errorf("describe stream error: %v", err)
}
ss := []string{}
for _, shard := range resp.StreamDescription.Shards {
ss = append(ss, *shard.ShardId)
}
return ss, nil
}
// GetRecords returns a chan Record from a Shard of the Stream
2017-11-27 02:16:32 +00:00
func (c *KinesisClient) GetRecords(ctx context.Context, streamName, shardID, lastSeqNum string) (<-chan *Record, <-chan error, error) {
shardIterator, err := c.getShardIterator(streamName, shardID, lastSeqNum)
if err != nil {
return nil, nil, fmt.Errorf("get shard iterator error: %v", err)
}
var (
recc = make(chan *Record, 10000)
errc = make(chan error, 1)
)
go func() {
defer func() {
close(recc)
close(errc)
}()
for {
select {
case <-ctx.Done():
return
default:
resp, err := c.svc.GetRecords(
&kinesis.GetRecordsInput{
ShardIterator: shardIterator,
},
)
if err != nil {
shardIterator, err = c.getShardIterator(streamName, shardID, lastSeqNum)
if err != nil {
errc <- fmt.Errorf("get shard iterator error: %v", err)
return
}
continue
}
for _, r := range resp.Records {
select {
case <-ctx.Done():
return
case recc <- r:
lastSeqNum = *r.SequenceNumber
}
}
if resp.NextShardIterator == nil || shardIterator == resp.NextShardIterator {
shardIterator, err = c.getShardIterator(streamName, shardID, lastSeqNum)
if err != nil {
errc <- fmt.Errorf("get shard iterator error: %v", err)
return
}
} else {
shardIterator = resp.NextShardIterator
}
}
}
}()
return recc, errc, nil
}
2017-11-27 02:16:32 +00:00
func (c *KinesisClient) getShardIterator(streamName, shardID, lastSeqNum string) (*string, error) {
params := &kinesis.GetShardIteratorInput{
ShardId: aws.String(shardID),
StreamName: aws.String(streamName),
}
if lastSeqNum != "" {
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
params.StartingSequenceNumber = aws.String(lastSeqNum)
} else {
params.ShardIteratorType = aws.String("TRIM_HORIZON")
}
resp, err := c.svc.GetShardIterator(params)
if err != nil {
return nil, err
}
return resp.ShardIterator, nil
}