Include ShardID in Record passed to ScanFunc (#121)
* Include ShardID in Record passed to ScanFunc * Update mock to explicitly use kinesis.Record Supports change wherein consumer.Record is changed from an alias of kinesis.Record to a composition containing it.
This commit is contained in:
parent
bae065cf53
commit
97ffabeaa5
2 changed files with 8 additions and 4 deletions
10
consumer.go
10
consumer.go
|
|
@ -16,8 +16,12 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Record is an alias of record returned from kinesis library
|
// Record wraps the record returned from the Kinesis library and
|
||||||
type Record = kinesis.Record
|
// extends to include the shard id.
|
||||||
|
type Record struct {
|
||||||
|
*kinesis.Record
|
||||||
|
ShardID string
|
||||||
|
}
|
||||||
|
|
||||||
// New creates a kinesis consumer with default settings. Use Option to override
|
// New creates a kinesis consumer with default settings. Use Option to override
|
||||||
// any of the optional attributes.
|
// any of the optional attributes.
|
||||||
|
|
@ -187,7 +191,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
err := fn(r)
|
err := fn(&Record{r, shardID})
|
||||||
if err != nil && err != ErrSkipCheckpoint {
|
if err != nil && err != ErrSkipCheckpoint {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -260,7 +260,7 @@ func TestScanShard_ShardIsClosed(t *testing.T) {
|
||||||
getRecordsMock: func(input *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
getRecordsMock: func(input *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
||||||
return &kinesis.GetRecordsOutput{
|
return &kinesis.GetRecordsOutput{
|
||||||
NextShardIterator: nil,
|
NextShardIterator: nil,
|
||||||
Records: make([]*Record, 0),
|
Records: make([]*kinesis.Record, 0),
|
||||||
}, nil
|
}, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue