From d5bdd3f4bc8922e099578b7d430ad0088e587935 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 1 May 2016 10:42:28 -0700 Subject: [PATCH] Leverage AWS S3 retries --- emitter/s3/emitter.go | 19 ++++++++++--------- examples/s3/main.go | 11 +++++++++-- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/emitter/s3/emitter.go b/emitter/s3/emitter.go index 1146548..89df95e 100644 --- a/emitter/s3/emitter.go +++ b/emitter/s3/emitter.go @@ -5,8 +5,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" - awss3 "github.com/aws/aws-sdk-go/service/s3" - "gopkg.in/matryer/try.v1" + "github.com/aws/aws-sdk-go/service/s3" ) // Emitter stores data in S3 bucket. @@ -17,24 +16,26 @@ import ( // dash. This struct requires the configuration of an S3 bucket and endpoint. type Emitter struct { Bucket string + Region string } // Emit is invoked when the buffer is full. This method emits the set of filtered records. func (e Emitter) Emit(s3Key string, b io.ReadSeeker) error { - svc := awss3.New(session.New()) + svc := s3.New( + session.New(aws.NewConfig().WithMaxRetries(10)), + &aws.Config{ + Region: aws.String(e.Region), + }, + ) - params := &awss3.PutObjectInput{ + params := &s3.PutObjectInput{ Body: b, Bucket: aws.String(e.Bucket), ContentType: aws.String("text/plain"), Key: aws.String(s3Key), } - err := try.Do(func(attempt int) (bool, error) { - var err error - _, err = svc.PutObject(params) - return attempt < 5, err - }) + _, err := svc.PutObject(params) if err != nil { return err diff --git a/examples/s3/main.go b/examples/s3/main.go index 499e844..ccba741 100644 --- a/examples/s3/main.go +++ b/examples/s3/main.go @@ -4,6 +4,7 @@ import ( "bytes" "flag" "fmt" + "os" "github.com/harlow/kinesis-connectors" "github.com/harlow/kinesis-connectors/emitter/s3" @@ -17,9 +18,14 @@ var ( func main() { flag.Parse() - emitter := &s3.Emitter{Bucket: *bucket} + + emitter := &s3.Emitter{ + Bucket: *bucket, + Region: "us-west-1", + } c := connector.NewConsumer(*app, *stream) + c.Start(connector.HandlerFunc(func(b connector.Buffer) { body := new(bytes.Buffer) @@ -33,7 +39,8 @@ func main() { ) if err != nil { - fmt.Printf("error %s", err) + fmt.Printf("error %s\n", err) + os.Exit(1) } }))