Add test for stopping scan
This commit is contained in:
parent
d6ded158bf
commit
23811ec99a
1 changed files with 48 additions and 2 deletions
|
|
@ -58,9 +58,8 @@ func TestScanShard(t *testing.T) {
|
|||
t.Fatalf("new consumer error: %v", err)
|
||||
}
|
||||
|
||||
var resultData string
|
||||
|
||||
// callback fn appends record data
|
||||
var resultData string
|
||||
var fn = func(r *Record) ScanStatus {
|
||||
resultData += string(r.Data)
|
||||
return ScanStatus{}
|
||||
|
|
@ -88,6 +87,53 @@ func TestScanShard(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestScanShard_StopScan(t *testing.T) {
|
||||
var records = []*kinesis.Record{
|
||||
&kinesis.Record{
|
||||
Data: []byte("firstData"),
|
||||
SequenceNumber: aws.String("firstSeqNum"),
|
||||
},
|
||||
&kinesis.Record{
|
||||
Data: []byte("lastData"),
|
||||
SequenceNumber: aws.String("lastSeqNum"),
|
||||
},
|
||||
}
|
||||
|
||||
var client = &kinesisClientMock{
|
||||
getShardIteratorMock: func(input *kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) {
|
||||
return &kinesis.GetShardIteratorOutput{
|
||||
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
|
||||
}, nil
|
||||
},
|
||||
getRecordsMock: func(input *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
||||
return &kinesis.GetRecordsOutput{
|
||||
NextShardIterator: nil,
|
||||
Records: records,
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
|
||||
c, err := New("myStreamName", WithClient(client))
|
||||
if err != nil {
|
||||
t.Fatalf("new consumer error: %v", err)
|
||||
}
|
||||
|
||||
// callback fn appends record data
|
||||
var resultData string
|
||||
var fn = func(r *Record) ScanStatus {
|
||||
resultData += string(r.Data)
|
||||
return ScanStatus{StopScan: true}
|
||||
}
|
||||
|
||||
if err := c.ScanShard(context.Background(), "myShard", fn); err != nil {
|
||||
t.Fatalf("scan shard error: %v", err)
|
||||
}
|
||||
|
||||
if resultData != "firstData" {
|
||||
t.Fatalf("callback error expected %s, got %s", "firstData", resultData)
|
||||
}
|
||||
}
|
||||
|
||||
func TestScanShard_ShardIsClosed(t *testing.T) {
|
||||
var client = &kinesisClientMock{
|
||||
getShardIteratorMock: func(input *kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue