From edf0467eb07815d2fcfa9ba8e6f6b3b9ec14a7ce Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Thu, 23 Nov 2017 11:29:58 -0800 Subject: [PATCH] Format errors from caller --- consumer.go | 19 ++++++++++++------- examples/consumer/main.go | 5 +++-- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/consumer.go b/consumer.go index 2480710..f662220 100644 --- a/consumer.go +++ b/consumer.go @@ -115,6 +115,10 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro return fmt.Errorf("describe stream error: %v", err) } + if len(resp.StreamDescription.Shards) == 0 { + return fmt.Errorf("no shards available") + } + var ( wg sync.WaitGroup errc = make(chan error, 1) @@ -125,8 +129,8 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro for _, shard := range resp.StreamDescription.Shards { go func(shardID string) { defer wg.Done() - err := c.ScanShard(ctx, shardID, fn) - if err != nil { + + if err := c.ScanShard(ctx, shardID, fn); err != nil { select { case errc <- fmt.Errorf("shard %s error: %v", shardID, err): // first error to occur @@ -134,6 +138,7 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro // error has already occured } } + c.logger.Println("exiting", shardID) cancel() }(*shard.ShardId) @@ -145,8 +150,8 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro } // ScanShard loops over records on a specific shard, calls the callback func -// for each record and checkpoints after each page is processed. -// Note: returning `false` from the callback func will end the scan. +// for each record and checkpoints the progress of scan. +// Note: Returning `false` from the callback func will end the scan. func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kinesis.Record) bool) error { c.logger.Println("scanning", shardID) @@ -190,7 +195,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kines if ok := fn(r); !ok { if err := c.setCheckpoint(shardID, lastSeqNum); err != nil { - return err + return fmt.Errorf("set checkpoint error: %v", err) } return nil } @@ -198,7 +203,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kines } if err := c.setCheckpoint(shardID, lastSeqNum); err != nil { - return err + return fmt.Errorf("set checkpoint error: %v", err) } } @@ -217,7 +222,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kines func (c *Consumer) setCheckpoint(shardID, lastSeqNum string) error { err := c.checkpoint.Set(c.streamName, shardID, lastSeqNum) if err != nil { - return fmt.Errorf("set checkpoint error: %v", err) + return err } c.logger.Println("checkpoint", shardID) c.counter.Add("checkpoints", 1) diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 528c5d2..45dc5fe 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -14,7 +14,7 @@ import ( checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" ) -// kick off a server for exposing metrics +// kick off a server for exposing scan metrics func init() { sock, err := net.Listen("tcp", "localhost:8080") if err != nil { @@ -45,7 +45,8 @@ func main() { } // consumer - c, err := consumer.New(*stream, + c, err := consumer.New( + *stream, consumer.WithCheckpoint(ck), consumer.WithLogger(logger), consumer.WithCounter(counter),