From 23811ec99aa59f870dbc29f74cb5efc345e001fd Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 29 Jul 2018 10:27:01 -0700 Subject: [PATCH] Add test for stopping scan --- consumer_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/consumer_test.go b/consumer_test.go index 8e02caa..ce066d7 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -58,9 +58,8 @@ func TestScanShard(t *testing.T) { t.Fatalf("new consumer error: %v", err) } - var resultData string - // callback fn appends record data + var resultData string var fn = func(r *Record) ScanStatus { resultData += string(r.Data) return ScanStatus{} @@ -88,6 +87,53 @@ func TestScanShard(t *testing.T) { } } +func TestScanShard_StopScan(t *testing.T) { + var records = []*kinesis.Record{ + &kinesis.Record{ + Data: []byte("firstData"), + SequenceNumber: aws.String("firstSeqNum"), + }, + &kinesis.Record{ + Data: []byte("lastData"), + SequenceNumber: aws.String("lastSeqNum"), + }, + } + + var client = &kinesisClientMock{ + getShardIteratorMock: func(input *kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) { + return &kinesis.GetShardIteratorOutput{ + ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"), + }, nil + }, + getRecordsMock: func(input *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) { + return &kinesis.GetRecordsOutput{ + NextShardIterator: nil, + Records: records, + }, nil + }, + } + + c, err := New("myStreamName", WithClient(client)) + if err != nil { + t.Fatalf("new consumer error: %v", err) + } + + // callback fn appends record data + var resultData string + var fn = func(r *Record) ScanStatus { + resultData += string(r.Data) + return ScanStatus{StopScan: true} + } + + if err := c.ScanShard(context.Background(), "myShard", fn); err != nil { + t.Fatalf("scan shard error: %v", err) + } + + if resultData != "firstData" { + t.Fatalf("callback error expected %s, got %s", "firstData", resultData) + } +} + func TestScanShard_ShardIsClosed(t *testing.T) { var client = &kinesisClientMock{ getShardIteratorMock: func(input *kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) {