Using ListShards instead of DescribeStream

This commit is contained in:
Farhan 2019-02-12 22:56:03 +05:30
parent 249baa2c72
commit 0a81fa2a35
2 changed files with 24 additions and 36 deletions

View file

@ -242,34 +242,26 @@ func (c *Consumer) handleRecord(shardID string, r *Record, fn func(*Record) Scan
func (c *Consumer) getShardIDs(streamName string) ([]string, error) { func (c *Consumer) getShardIDs(streamName string) ([]string, error) {
var ss []string var ss []string
var exclusiveStartShardId *string var listShardsInput = &kinesis.ListShardsInput{
StreamName: aws.String(streamName),
}
for { for {
resp, err := c.client.DescribeStream( resp, err := c.client.ListShards(listShardsInput)
&kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
ExclusiveStartShardId: exclusiveStartShardId,
},
)
if err != nil { if err != nil {
return nil, fmt.Errorf("describe stream error: %v", err) return nil, fmt.Errorf("ListShards error: %v", err)
} }
streamDescription := resp.StreamDescription for _, shard := range resp.Shards {
shards := streamDescription.Shards
if len(shards) == 0 {
return ss, nil
}
for _, shard := range shards {
ss = append(ss, *shard.ShardId) ss = append(ss, *shard.ShardId)
} }
exclusiveStartShardId = shards[len(shards)-1].ShardId if resp.NextToken == nil {
if *streamDescription.HasMoreShards == false {
return ss, nil return ss, nil
} }
listShardsInput = &kinesis.ListShardsInput{
NextToken: resp.NextToken,
}
} }
} }

View file

@ -40,13 +40,11 @@ func TestConsumer_Scan(t *testing.T) {
Records: records, Records: records,
}, nil }, nil
}, },
describeStreamMock: func(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { listShardsMock: func(input *kinesis.ListShardsInput) (*kinesis.ListShardsOutput, error) {
return &kinesis.DescribeStreamOutput{ return &kinesis.ListShardsOutput{
StreamDescription: &kinesis.StreamDescription{ Shards: []*kinesis.Shard{
Shards: []*kinesis.Shard{ {ShardId: aws.String("myShard")},
{ShardId: aws.String("myShard")}, },
},
HasMoreShards: aws.Bool(false)},
}, nil }, nil
}, },
} }
@ -94,11 +92,9 @@ func TestConsumer_Scan(t *testing.T) {
func TestConsumer_Scan_NoShardsAvailable(t *testing.T) { func TestConsumer_Scan_NoShardsAvailable(t *testing.T) {
client := &kinesisClientMock{ client := &kinesisClientMock{
describeStreamMock: func(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { listShardsMock: func(input *kinesis.ListShardsInput) (*kinesis.ListShardsOutput, error) {
return &kinesis.DescribeStreamOutput{ return &kinesis.ListShardsOutput{
StreamDescription: &kinesis.StreamDescription{ Shards: make([]*kinesis.Shard, 0),
Shards: make([]*kinesis.Shard, 0),
},
}, nil }, nil
}, },
} }
@ -287,7 +283,11 @@ type kinesisClientMock struct {
kinesisiface.KinesisAPI kinesisiface.KinesisAPI
getShardIteratorMock func(*kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) getShardIteratorMock func(*kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error)
getRecordsMock func(*kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) getRecordsMock func(*kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error)
describeStreamMock func(*kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) listShardsMock func(*kinesis.ListShardsInput) (*kinesis.ListShardsOutput, error)
}
func (c *kinesisClientMock) ListShards(in *kinesis.ListShardsInput) (*kinesis.ListShardsOutput, error) {
return c.listShardsMock(in)
} }
func (c *kinesisClientMock) GetRecords(in *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) { func (c *kinesisClientMock) GetRecords(in *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
@ -298,10 +298,6 @@ func (c *kinesisClientMock) GetShardIterator(in *kinesis.GetShardIteratorInput)
return c.getShardIteratorMock(in) return c.getShardIteratorMock(in)
} }
func (c *kinesisClientMock) DescribeStream(in *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
return c.describeStreamMock(in)
}
// implementation of checkpoint // implementation of checkpoint
type fakeCheckpoint struct { type fakeCheckpoint struct {
cache map[string]string cache map[string]string