Fix writing to closed channel
This commit is contained in:
parent
b37eaf7eec
commit
cdaa98adfd
1 changed files with 10 additions and 2 deletions
12
consumer.go
12
consumer.go
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
|
@ -100,22 +101,29 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
|
||||||
close(shardc)
|
close(shardc)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
wg := new(sync.WaitGroup)
|
||||||
// process each of the shards
|
// process each of the shards
|
||||||
for shard := range shardc {
|
for shard := range shardc {
|
||||||
|
wg.Add(1)
|
||||||
go func(shardID string) {
|
go func(shardID string) {
|
||||||
|
defer wg.Done()
|
||||||
if err := c.ScanShard(ctx, shardID, fn); err != nil {
|
if err := c.ScanShard(ctx, shardID, fn); err != nil {
|
||||||
select {
|
select {
|
||||||
case errc <- fmt.Errorf("shard %s error: %v", shardID, err):
|
case errc <- fmt.Errorf("shard %s error: %v", shardID, err):
|
||||||
// first error to occur
|
// first error to occur
|
||||||
cancel()
|
cancel()
|
||||||
default:
|
default:
|
||||||
// error has already occured
|
// error has already occurred
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(aws.StringValue(shard.ShardId))
|
}(aws.StringValue(shard.ShardId))
|
||||||
}
|
}
|
||||||
|
|
||||||
close(errc)
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(errc)
|
||||||
|
}()
|
||||||
|
|
||||||
return <-errc
|
return <-errc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue