change cancel place (#82)

This commit is contained in:
Emanuel Ramos 2019-02-18 15:59:20 +00:00 committed by Harlow Ward
parent 2037463c62
commit 245d1bd6b5

View file

@ -135,6 +135,8 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error
defer wg.Done()
if err := c.ScanShard(ctx, shardID, fn); err != nil {
cancel()
select {
case errc <- fmt.Errorf("shard %s error: %v", shardID, err):
// first error to occur
@ -142,8 +144,6 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error
// error has already occured
}
}
cancel()
}(shardID)
}