diff --git a/consumer.go b/consumer.go index cedae08..fb1cbd4 100644 --- a/consumer.go +++ b/consumer.go @@ -135,6 +135,8 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error defer wg.Done() if err := c.ScanShard(ctx, shardID, fn); err != nil { + cancel() + select { case errc <- fmt.Errorf("shard %s error: %v", shardID, err): // first error to occur @@ -142,8 +144,6 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error // error has already occured } } - - cancel() }(shardID) }