imgbundler: Fixes

- Correct concurrency
- Return multierr
- Use []byte wherever possible
- Add semaphore to limit number of workers
- Separate timeout for each fetch
This commit is contained in:
Anmol Sethi 2022-11-29 11:40:53 -08:00
parent 7368d15186
commit d31ed1d3ac
No known key found for this signature in database
GPG key ID: 25BC68888A99A8BA
3 changed files with 64 additions and 37 deletions

2
go.mod
View file

@ -52,6 +52,8 @@ require (
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/ugorji/go/codec v1.2.6 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/term v0.0.0-20221017184919-83659145692c // indirect

4
go.sum
View file

@ -312,6 +312,10 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

View file

@ -1,6 +1,7 @@
package imgbundler
import (
"bytes"
"context"
"encoding/base64"
"fmt"
@ -13,6 +14,9 @@ import (
"sync"
"time"
"go.uber.org/multierr"
"oss.terrastruct.com/xdefer"
"oss.terrastruct.com/d2/lib/xmain"
)
@ -32,68 +36,85 @@ func InlineRemote(ms *xmain.State, in []byte) ([]byte, error) {
return inline(ms, in, true)
}
func inline(ms *xmain.State, svg []byte, isRemote bool) ([]byte, error) {
func inline(ms *xmain.State, svg []byte, isRemote bool) (_ []byte, err error) {
defer xdefer.Errorf(&err, "failed to bundle images")
imgs := imageRe.FindAllSubmatch(svg, -1)
var filtered [][]string
var filtered [][][]byte
for _, img := range imgs {
u, err := url.Parse(string(img[1]))
isRemoteImg := err == nil && strings.HasPrefix(u.Scheme, "http")
if isRemoteImg == isRemote {
filtered = append(filtered, []string{string(img[0]), string(img[1])})
filtered = append(filtered, img)
}
}
var wg sync.WaitGroup
respChan := make(chan resp)
// Limits the number of workers to 16.
sema := make(chan struct{}, 16)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
wg.Add(len(filtered))
for _, img := range filtered {
go func(src, href string) {
var data string
var err error
if isRemote {
data, err = fetch(ctx, href)
} else {
data, err = read(ctx, href)
}
respChan <- resp{
srctxt: src,
data: data,
err: err,
}
}(img[0], img[1])
}
out := string(svg)
wg.Add(len(filtered))
// Start workers as the sema allows.
go func() {
for {
select {
case resp, ok := <-respChan:
if !ok {
return
}
if resp.err != nil {
ms.Log.Error.Printf("image failed to fetch: %v", resp.err)
for _, img := range filtered {
sema <- struct{}{}
go func(src, href string) {
defer func() {
wg.Done()
<-sema
}()
var data string
var err error
if isRemote {
data, err = fetch(ctx, href)
} else {
out = strings.Replace(out, resp.srctxt, fmt.Sprintf(`<image href="%s"`, resp.data), 1)
data, err = read(href)
}
wg.Done()
}
select {
case <-ctx.Done():
case respChan <- resp{
srctxt: src,
data: data,
err: err,
}:
}
}(string(img[0]), string(img[1]))
}
}()
wg.Wait()
close(respChan)
go func() {
wg.Wait()
close(respChan)
}()
return []byte(out), nil
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("failed to wait for imgbundler workers: %w", ctx.Err())
case resp, ok := <-respChan:
if !ok {
return svg, nil
}
if resp.err != nil {
err = multierr.Combine(err, resp.err)
continue
}
svg = bytes.Replace(svg, []byte(resp.srctxt), []byte(fmt.Sprintf(`<image href="%s"`, resp.data)), 1)
}
}
}
var transport = http.DefaultTransport
func fetch(ctx context.Context, href string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", href, nil)
if err != nil {
return "", err
@ -118,7 +139,7 @@ func fetch(ctx context.Context, href string) (string, error) {
return fmt.Sprintf("data:%s;base64,%s", mimeType, enc), nil
}
func read(ctx context.Context, href string) (string, error) {
func read(href string) (string, error) {
data, err := os.ReadFile(href)
if err != nil {
return "", err