Leverage AWS S3 retries

This commit is contained in:
Harlow Ward 2016-05-01 10:42:28 -07:00
parent 49b5a94c7e
commit d5bdd3f4bc
2 changed files with 19 additions and 11 deletions

View file

@ -5,8 +5,7 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
awss3 "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"gopkg.in/matryer/try.v1"
) )
// Emitter stores data in S3 bucket. // Emitter stores data in S3 bucket.
@ -17,24 +16,26 @@ import (
// dash. This struct requires the configuration of an S3 bucket and endpoint. // dash. This struct requires the configuration of an S3 bucket and endpoint.
type Emitter struct { type Emitter struct {
Bucket string Bucket string
Region string
} }
// Emit is invoked when the buffer is full. This method emits the set of filtered records. // Emit is invoked when the buffer is full. This method emits the set of filtered records.
func (e Emitter) Emit(s3Key string, b io.ReadSeeker) error { func (e Emitter) Emit(s3Key string, b io.ReadSeeker) error {
svc := awss3.New(session.New()) svc := s3.New(
session.New(aws.NewConfig().WithMaxRetries(10)),
&aws.Config{
Region: aws.String(e.Region),
},
)
params := &awss3.PutObjectInput{ params := &s3.PutObjectInput{
Body: b, Body: b,
Bucket: aws.String(e.Bucket), Bucket: aws.String(e.Bucket),
ContentType: aws.String("text/plain"), ContentType: aws.String("text/plain"),
Key: aws.String(s3Key), Key: aws.String(s3Key),
} }
err := try.Do(func(attempt int) (bool, error) { _, err := svc.PutObject(params)
var err error
_, err = svc.PutObject(params)
return attempt < 5, err
})
if err != nil { if err != nil {
return err return err

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"flag" "flag"
"fmt" "fmt"
"os"
"github.com/harlow/kinesis-connectors" "github.com/harlow/kinesis-connectors"
"github.com/harlow/kinesis-connectors/emitter/s3" "github.com/harlow/kinesis-connectors/emitter/s3"
@ -17,9 +18,14 @@ var (
func main() { func main() {
flag.Parse() flag.Parse()
emitter := &s3.Emitter{Bucket: *bucket}
emitter := &s3.Emitter{
Bucket: *bucket,
Region: "us-west-1",
}
c := connector.NewConsumer(*app, *stream) c := connector.NewConsumer(*app, *stream)
c.Start(connector.HandlerFunc(func(b connector.Buffer) { c.Start(connector.HandlerFunc(func(b connector.Buffer) {
body := new(bytes.Buffer) body := new(bytes.Buffer)
@ -33,7 +39,8 @@ func main() {
) )
if err != nil { if err != nil {
fmt.Printf("error %s", err) fmt.Printf("error %s\n", err)
os.Exit(1)
} }
})) }))