diff --git a/consumer.go b/consumer.go index 86f311c..698563d 100644 --- a/consumer.go +++ b/consumer.go @@ -147,6 +147,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e // attempt to recover from GetRecords error by getting new shard iterator if err != nil { + shardIterator, err = c.getShardIterator(c.streamName, shardID, lastSeqNum) if err != nil { return fmt.Errorf("get shard iterator error: %v", err) diff --git a/consumer_test.go b/consumer_test.go index 3151cec..0a316ba 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -57,7 +57,7 @@ func TestScan(t *testing.T) { c, err := New("myStreamName", WithClient(client), WithCounter(ctr), - WithStorage(cp), + WithStore(cp), ) if err != nil { t.Fatalf("new consumer error: %v", err) @@ -119,7 +119,7 @@ func TestScanShard(t *testing.T) { c, err := New("myStreamName", WithClient(client), WithCounter(ctr), - WithStorage(cp), + WithStore(cp), ) if err != nil { t.Fatalf("new consumer error: %v", err) @@ -219,7 +219,7 @@ func TestScanShard_SkipCheckpoint(t *testing.T) { var cp = &fakeCheckpoint{cache: map[string]string{}} - c, err := New("myStreamName", WithClient(client), WithStorage(cp)) + c, err := New("myStreamName", WithClient(client), WithStore(cp)) if err != nil { t.Fatalf("new consumer error: %v", err) } diff --git a/examples/consumer/cp-dynamo/main.go b/examples/consumer/cp-dynamo/main.go index d74c816..c4f9d93 100644 --- a/examples/consumer/cp-dynamo/main.go +++ b/examples/consumer/cp-dynamo/main.go @@ -81,7 +81,7 @@ func main() { // consumer c, err := consumer.New( *stream, - consumer.WithStorage(ddb), + consumer.WithStore(ddb), consumer.WithLogger(log), consumer.WithCounter(counter), consumer.WithClient(myKsis), diff --git a/examples/consumer/cp-mysql/main.go b/examples/consumer/cp-mysql/main.go index 349f190..8b8c85b 100644 --- a/examples/consumer/cp-mysql/main.go +++ b/examples/consumer/cp-mysql/main.go @@ -33,7 +33,7 @@ func main() { // consumer c, err := consumer.New( *stream, - consumer.WithStorage(ck), + consumer.WithStore(ck), consumer.WithCounter(counter), ) if err != nil { diff --git a/examples/consumer/cp-postgres/main.go b/examples/consumer/cp-postgres/main.go index ed83cc5..b43812d 100644 --- a/examples/consumer/cp-postgres/main.go +++ b/examples/consumer/cp-postgres/main.go @@ -33,7 +33,7 @@ func main() { // consumer c, err := consumer.New( *stream, - consumer.WithStorage(ck), + consumer.WithStore(ck), consumer.WithCounter(counter), ) if err != nil { diff --git a/examples/consumer/cp-redis/main.go b/examples/consumer/cp-redis/main.go index 7200850..e08ec16 100644 --- a/examples/consumer/cp-redis/main.go +++ b/examples/consumer/cp-redis/main.go @@ -43,7 +43,7 @@ func main() { // consumer c, err := consumer.New( *stream, - consumer.WithStorage(ck), + consumer.WithStore(ck), consumer.WithLogger(logger), ) if err != nil { diff --git a/options.go b/options.go index 4740dd4..5810865 100644 --- a/options.go +++ b/options.go @@ -5,8 +5,8 @@ import "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" // Option is used to override defaults when creating a new Consumer type Option func(*Consumer) -// WithStorage overrides the default storage -func WithStorage(store Store) Option { +// WithStore overrides the default storage +func WithStore(store Store) Option { return func(c *Consumer) { c.store = store }