Format errors from caller
This commit is contained in:
parent
86f1df782e
commit
edf0467eb0
2 changed files with 15 additions and 9 deletions
19
consumer.go
19
consumer.go
|
|
@ -115,6 +115,10 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro
|
||||||
return fmt.Errorf("describe stream error: %v", err)
|
return fmt.Errorf("describe stream error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(resp.StreamDescription.Shards) == 0 {
|
||||||
|
return fmt.Errorf("no shards available")
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
errc = make(chan error, 1)
|
errc = make(chan error, 1)
|
||||||
|
|
@ -125,8 +129,8 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro
|
||||||
for _, shard := range resp.StreamDescription.Shards {
|
for _, shard := range resp.StreamDescription.Shards {
|
||||||
go func(shardID string) {
|
go func(shardID string) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
err := c.ScanShard(ctx, shardID, fn)
|
|
||||||
if err != nil {
|
if err := c.ScanShard(ctx, shardID, fn); err != nil {
|
||||||
select {
|
select {
|
||||||
case errc <- fmt.Errorf("shard %s error: %v", shardID, err):
|
case errc <- fmt.Errorf("shard %s error: %v", shardID, err):
|
||||||
// first error to occur
|
// first error to occur
|
||||||
|
|
@ -134,6 +138,7 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro
|
||||||
// error has already occured
|
// error has already occured
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Println("exiting", shardID)
|
c.logger.Println("exiting", shardID)
|
||||||
cancel()
|
cancel()
|
||||||
}(*shard.ShardId)
|
}(*shard.ShardId)
|
||||||
|
|
@ -145,8 +150,8 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScanShard loops over records on a specific shard, calls the callback func
|
// ScanShard loops over records on a specific shard, calls the callback func
|
||||||
// for each record and checkpoints after each page is processed.
|
// for each record and checkpoints the progress of scan.
|
||||||
// Note: returning `false` from the callback func will end the scan.
|
// Note: Returning `false` from the callback func will end the scan.
|
||||||
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kinesis.Record) bool) error {
|
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kinesis.Record) bool) error {
|
||||||
c.logger.Println("scanning", shardID)
|
c.logger.Println("scanning", shardID)
|
||||||
|
|
||||||
|
|
@ -190,7 +195,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kines
|
||||||
|
|
||||||
if ok := fn(r); !ok {
|
if ok := fn(r); !ok {
|
||||||
if err := c.setCheckpoint(shardID, lastSeqNum); err != nil {
|
if err := c.setCheckpoint(shardID, lastSeqNum); err != nil {
|
||||||
return err
|
return fmt.Errorf("set checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -198,7 +203,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kines
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.setCheckpoint(shardID, lastSeqNum); err != nil {
|
if err := c.setCheckpoint(shardID, lastSeqNum); err != nil {
|
||||||
return err
|
return fmt.Errorf("set checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -217,7 +222,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kines
|
||||||
func (c *Consumer) setCheckpoint(shardID, lastSeqNum string) error {
|
func (c *Consumer) setCheckpoint(shardID, lastSeqNum string) error {
|
||||||
err := c.checkpoint.Set(c.streamName, shardID, lastSeqNum)
|
err := c.checkpoint.Set(c.streamName, shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("set checkpoint error: %v", err)
|
return err
|
||||||
}
|
}
|
||||||
c.logger.Println("checkpoint", shardID)
|
c.logger.Println("checkpoint", shardID)
|
||||||
c.counter.Add("checkpoints", 1)
|
c.counter.Add("checkpoints", 1)
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import (
|
||||||
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis"
|
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
// kick off a server for exposing metrics
|
// kick off a server for exposing scan metrics
|
||||||
func init() {
|
func init() {
|
||||||
sock, err := net.Listen("tcp", "localhost:8080")
|
sock, err := net.Listen("tcp", "localhost:8080")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -45,7 +45,8 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// consumer
|
// consumer
|
||||||
c, err := consumer.New(*stream,
|
c, err := consumer.New(
|
||||||
|
*stream,
|
||||||
consumer.WithCheckpoint(ck),
|
consumer.WithCheckpoint(ck),
|
||||||
consumer.WithLogger(logger),
|
consumer.WithLogger(logger),
|
||||||
consumer.WithCounter(counter),
|
consumer.WithCounter(counter),
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue