From a62e7514e421767456f8f767499e2d8332081b09 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 26 Nov 2017 18:16:32 -0800 Subject: [PATCH] Make the Kinesis client exportable --- client.go | 14 +++++++------- consumer.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/client.go b/client.go index f46760b..36d92e9 100644 --- a/client.go +++ b/client.go @@ -9,19 +9,19 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" ) -// NewClient returns a new client with kinesis client -func NewClient() *client { +// NewKinesisClient returns a new client with kinesis client +func NewKinesisClient() Client { svc := kinesis.New(session.New(aws.NewConfig())) - return &client{svc} + return &KinesisClient{svc} } // Client acts as wrapper around Kinesis client -type client struct { +type KinesisClient struct { svc *kinesis.Kinesis } // GetShardIDs returns shard ids in a given stream -func (c *client) GetShardIDs(streamName string) ([]string, error) { +func (c *KinesisClient) GetShardIDs(streamName string) ([]string, error) { resp, err := c.svc.DescribeStream( &kinesis.DescribeStreamInput{ StreamName: aws.String(streamName), @@ -39,7 +39,7 @@ func (c *client) GetShardIDs(streamName string) ([]string, error) { } // GetRecords returns a chan Record from a Shard of the Stream -func (c *client) GetRecords(ctx context.Context, streamName, shardID, lastSeqNum string) (<-chan *Record, <-chan error, error) { +func (c *KinesisClient) GetRecords(ctx context.Context, streamName, shardID, lastSeqNum string) (<-chan *Record, <-chan error, error) { shardIterator, err := c.getShardIterator(streamName, shardID, lastSeqNum) if err != nil { return nil, nil, fmt.Errorf("get shard iterator error: %v", err) @@ -101,7 +101,7 @@ func (c *client) GetRecords(ctx context.Context, streamName, shardID, lastSeqNum return recc, errc, nil } -func (c *client) getShardIterator(streamName, shardID, lastSeqNum string) (*string, error) { +func (c *KinesisClient) getShardIterator(streamName, shardID, lastSeqNum string) (*string, error) { params := &kinesis.GetShardIteratorInput{ ShardId: aws.String(shardID), StreamName: aws.String(streamName), diff --git a/consumer.go b/consumer.go index 5c96424..51afaee 100644 --- a/consumer.go +++ b/consumer.go @@ -87,7 +87,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { checkpoint: &noopCheckpoint{}, counter: &noopCounter{}, logger: log.New(ioutil.Discard, "", log.LstdFlags), - client: NewClient(), + client: NewKinesisClient(), } // override defaults