Make the Kinesis client exportable

This commit is contained in:
Harlow Ward 2017-11-26 18:16:32 -08:00
parent b875bb56e7
commit a62e7514e4
2 changed files with 8 additions and 8 deletions

View file

@ -9,19 +9,19 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
) )
// NewClient returns a new client with kinesis client // NewKinesisClient returns a new client with kinesis client
func NewClient() *client { func NewKinesisClient() Client {
svc := kinesis.New(session.New(aws.NewConfig())) svc := kinesis.New(session.New(aws.NewConfig()))
return &client{svc} return &KinesisClient{svc}
} }
// Client acts as wrapper around Kinesis client // Client acts as wrapper around Kinesis client
type client struct { type KinesisClient struct {
svc *kinesis.Kinesis svc *kinesis.Kinesis
} }
// GetShardIDs returns shard ids in a given stream // GetShardIDs returns shard ids in a given stream
func (c *client) GetShardIDs(streamName string) ([]string, error) { func (c *KinesisClient) GetShardIDs(streamName string) ([]string, error) {
resp, err := c.svc.DescribeStream( resp, err := c.svc.DescribeStream(
&kinesis.DescribeStreamInput{ &kinesis.DescribeStreamInput{
StreamName: aws.String(streamName), StreamName: aws.String(streamName),
@ -39,7 +39,7 @@ func (c *client) GetShardIDs(streamName string) ([]string, error) {
} }
// GetRecords returns a chan Record from a Shard of the Stream // GetRecords returns a chan Record from a Shard of the Stream
func (c *client) GetRecords(ctx context.Context, streamName, shardID, lastSeqNum string) (<-chan *Record, <-chan error, error) { func (c *KinesisClient) GetRecords(ctx context.Context, streamName, shardID, lastSeqNum string) (<-chan *Record, <-chan error, error) {
shardIterator, err := c.getShardIterator(streamName, shardID, lastSeqNum) shardIterator, err := c.getShardIterator(streamName, shardID, lastSeqNum)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("get shard iterator error: %v", err) return nil, nil, fmt.Errorf("get shard iterator error: %v", err)
@ -101,7 +101,7 @@ func (c *client) GetRecords(ctx context.Context, streamName, shardID, lastSeqNum
return recc, errc, nil return recc, errc, nil
} }
func (c *client) getShardIterator(streamName, shardID, lastSeqNum string) (*string, error) { func (c *KinesisClient) getShardIterator(streamName, shardID, lastSeqNum string) (*string, error) {
params := &kinesis.GetShardIteratorInput{ params := &kinesis.GetShardIteratorInput{
ShardId: aws.String(shardID), ShardId: aws.String(shardID),
StreamName: aws.String(streamName), StreamName: aws.String(streamName),

View file

@ -87,7 +87,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
checkpoint: &noopCheckpoint{}, checkpoint: &noopCheckpoint{},
counter: &noopCounter{}, counter: &noopCounter{},
logger: log.New(ioutil.Discard, "", log.LstdFlags), logger: log.New(ioutil.Discard, "", log.LstdFlags),
client: NewClient(), client: NewKinesisClient(),
} }
// override defaults // override defaults