From cdaa98adfd546e35453df78030f882e2349d1d3b Mon Sep 17 00:00:00 2001 From: "Andrew S. Brown" Date: Tue, 16 Jul 2019 23:13:01 -0700 Subject: [PATCH] Fix writing to closed channel --- consumer.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index 05c59cc..8121e95 100644 --- a/consumer.go +++ b/consumer.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "log" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -100,22 +101,29 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error { close(shardc) }() + wg := new(sync.WaitGroup) // process each of the shards for shard := range shardc { + wg.Add(1) go func(shardID string) { + defer wg.Done() if err := c.ScanShard(ctx, shardID, fn); err != nil { select { case errc <- fmt.Errorf("shard %s error: %v", shardID, err): // first error to occur cancel() default: - // error has already occured + // error has already occurred } } }(aws.StringValue(shard.ShardId)) } - close(errc) + go func() { + wg.Wait() + close(errc) + }() + return <-errc }