From 245d1bd6b55b728d75fbb5961dcb1021d4811d0e Mon Sep 17 00:00:00 2001 From: Emanuel Ramos <40024962+imaramos@users.noreply.github.com> Date: Mon, 18 Feb 2019 15:59:20 +0000 Subject: [PATCH] change cancel place (#82) --- consumer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index cedae08..fb1cbd4 100644 --- a/consumer.go +++ b/consumer.go @@ -135,6 +135,8 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error defer wg.Done() if err := c.ScanShard(ctx, shardID, fn); err != nil { + cancel() + select { case errc <- fmt.Errorf("shard %s error: %v", shardID, err): // first error to occur @@ -142,8 +144,6 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error // error has already occured } } - - cancel() }(shardID) }