Update examples to use Store interface
This commit is contained in:
parent
d2cf65fa7a
commit
a9c97d3b93
7 changed files with 10 additions and 9 deletions
|
|
@ -147,6 +147,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
|
|
||||||
// attempt to recover from GetRecords error by getting new shard iterator
|
// attempt to recover from GetRecords error by getting new shard iterator
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
shardIterator, err = c.getShardIterator(c.streamName, shardID, lastSeqNum)
|
shardIterator, err = c.getShardIterator(c.streamName, shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get shard iterator error: %v", err)
|
return fmt.Errorf("get shard iterator error: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -57,7 +57,7 @@ func TestScan(t *testing.T) {
|
||||||
c, err := New("myStreamName",
|
c, err := New("myStreamName",
|
||||||
WithClient(client),
|
WithClient(client),
|
||||||
WithCounter(ctr),
|
WithCounter(ctr),
|
||||||
WithStorage(cp),
|
WithStore(cp),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("new consumer error: %v", err)
|
t.Fatalf("new consumer error: %v", err)
|
||||||
|
|
@ -119,7 +119,7 @@ func TestScanShard(t *testing.T) {
|
||||||
c, err := New("myStreamName",
|
c, err := New("myStreamName",
|
||||||
WithClient(client),
|
WithClient(client),
|
||||||
WithCounter(ctr),
|
WithCounter(ctr),
|
||||||
WithStorage(cp),
|
WithStore(cp),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("new consumer error: %v", err)
|
t.Fatalf("new consumer error: %v", err)
|
||||||
|
|
@ -219,7 +219,7 @@ func TestScanShard_SkipCheckpoint(t *testing.T) {
|
||||||
|
|
||||||
var cp = &fakeCheckpoint{cache: map[string]string{}}
|
var cp = &fakeCheckpoint{cache: map[string]string{}}
|
||||||
|
|
||||||
c, err := New("myStreamName", WithClient(client), WithStorage(cp))
|
c, err := New("myStreamName", WithClient(client), WithStore(cp))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("new consumer error: %v", err)
|
t.Fatalf("new consumer error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ func main() {
|
||||||
// consumer
|
// consumer
|
||||||
c, err := consumer.New(
|
c, err := consumer.New(
|
||||||
*stream,
|
*stream,
|
||||||
consumer.WithStorage(ddb),
|
consumer.WithStore(ddb),
|
||||||
consumer.WithLogger(log),
|
consumer.WithLogger(log),
|
||||||
consumer.WithCounter(counter),
|
consumer.WithCounter(counter),
|
||||||
consumer.WithClient(myKsis),
|
consumer.WithClient(myKsis),
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ func main() {
|
||||||
// consumer
|
// consumer
|
||||||
c, err := consumer.New(
|
c, err := consumer.New(
|
||||||
*stream,
|
*stream,
|
||||||
consumer.WithStorage(ck),
|
consumer.WithStore(ck),
|
||||||
consumer.WithCounter(counter),
|
consumer.WithCounter(counter),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ func main() {
|
||||||
// consumer
|
// consumer
|
||||||
c, err := consumer.New(
|
c, err := consumer.New(
|
||||||
*stream,
|
*stream,
|
||||||
consumer.WithStorage(ck),
|
consumer.WithStore(ck),
|
||||||
consumer.WithCounter(counter),
|
consumer.WithCounter(counter),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ func main() {
|
||||||
// consumer
|
// consumer
|
||||||
c, err := consumer.New(
|
c, err := consumer.New(
|
||||||
*stream,
|
*stream,
|
||||||
consumer.WithStorage(ck),
|
consumer.WithStore(ck),
|
||||||
consumer.WithLogger(logger),
|
consumer.WithLogger(logger),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,8 @@ import "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||||
// Option is used to override defaults when creating a new Consumer
|
// Option is used to override defaults when creating a new Consumer
|
||||||
type Option func(*Consumer)
|
type Option func(*Consumer)
|
||||||
|
|
||||||
// WithStorage overrides the default storage
|
// WithStore overrides the default storage
|
||||||
func WithStorage(store Store) Option {
|
func WithStore(store Store) Option {
|
||||||
return func(c *Consumer) {
|
return func(c *Consumer) {
|
||||||
c.store = store
|
c.store = store
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue