From c5c7dd49c97926b1a267b2681afc1fd236180442 Mon Sep 17 00:00:00 2001 From: dan Date: Sun, 29 Mar 2015 12:22:29 -0700 Subject: [PATCH 01/22] add / between bucket and filename in copy statement --- redshift_basic_emitter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 7c48d22..d1d9b6e 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -48,7 +48,7 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { func (e RedshiftBasicEmtitter) copyStatement(s3File string) string { b := new(bytes.Buffer) b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) - b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File)) + b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File)) b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY"))) b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY"))) switch e.Format { From 797b575ad16b3907371d878534b6ef172222e077 Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 3 Apr 2015 15:33:34 -0700 Subject: [PATCH 02/22] add recoverability in the pipeline, especially for throughput errors --- kinesis.go | 2 +- pipeline.go | 44 +++++++++++++++++++++++++++++++++++++++--- s3_manifest_emitter.go | 2 +- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/kinesis.go b/kinesis.go index 77f0644..1d66ca7 100644 --- a/kinesis.go +++ b/kinesis.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/sendgridlabs/go-kinesis" + "github.com/ezoic/go-kinesis" ) // CreateStream creates a new Kinesis stream (uses existing stream if exists) and diff --git a/pipeline.go b/pipeline.go index 14d4198..f6b6fb2 100644 --- a/pipeline.go +++ b/pipeline.go @@ -2,9 +2,12 @@ package connector import ( "log" + "math" + "os" "time" - "github.com/sendgridlabs/go-kinesis" + "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/sol/log4go" ) // Pipeline is used as a record processor to configure a pipline. @@ -21,6 +24,28 @@ type Pipeline struct { Transformer Transformer } +// this determines whether the error is recoverable +func (p Pipeline) isRecoverableError(err error) bool { + r := false + + cErr, ok := err.(*kinesis.Error) + if ok && cErr.Code == "ProvisionedThroughputExceeded" { + r = true + } + + return true +} + +// handle the aws exponential backoff +func (p Pipeline) handleAwsWaitTimeExp(attempts int) { + + //http://docs.aws.amazon.com/general/latest/gr/api-retries.html + // wait up to 5 minutes based on the aws exponential backoff algorithm + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + time.Sleep(waitTime) + +} + // ProcessShard kicks off the process of a Kinesis Shard. // It is a long running process that will continue to read from the shard. func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { @@ -43,13 +68,26 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardIterator := shardInfo.ShardIterator + consecutiveErrorAttempts := 0 + for { + + // handle the aws backoff stuff + p.handleAwsWaitTimeExp(consecutiveErrorAttempts) + args = kinesis.NewArgs() args.Add("ShardIterator", shardIterator) recordSet, err := ksis.GetRecords(args) if err != nil { - log.Fatalf("GetRecords ERROR: %v\n", err) + if p.isRecoverableError(err) { + consecutiveErrorAttempts++ + } else { + l4g.Critical("GetRecords ERROR: %v\n", err) + os.Exit(1) + } + } else { + consecutiveErrorAttempts = 0 } if len(recordSet.Records) > 0 { @@ -57,7 +95,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { data := v.GetData() if err != nil { - log.Printf("GetData ERROR: %v\n", err) + l4g.Error("GetData ERROR: %v\n", err) continue } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 756d760..8918da8 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -3,7 +3,7 @@ package connector import ( "log" - "github.com/sendgridlabs/go-kinesis" + "github.com/ezoic/go-kinesis" ) // An implementation of Emitter that puts event data on S3 file, and then puts the From 650211556381cef63082fd39f9145426e9cbf993 Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 3 Apr 2015 15:38:04 -0700 Subject: [PATCH 03/22] add recoverability in the pipeline, especially for throughput errors --- pipeline.go | 2 +- redshift_basic_emitter_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pipeline.go b/pipeline.go index f6b6fb2..45ed981 100644 --- a/pipeline.go +++ b/pipeline.go @@ -33,7 +33,7 @@ func (p Pipeline) isRecoverableError(err error) bool { r = true } - return true + return r } // handle the aws exponential backoff diff --git a/redshift_basic_emitter_test.go b/redshift_basic_emitter_test.go index 7d993a1..a93d0f4 100644 --- a/redshift_basic_emitter_test.go +++ b/redshift_basic_emitter_test.go @@ -10,7 +10,7 @@ func TestCopyStatement(t *testing.T) { S3Bucket: "test_bucket", TableName: "test_table", } - f := e.copyStatement("/test.txt") + f := e.copyStatement("test.txt") copyStatement := "COPY test_table FROM 's3://test_bucket/test.txt' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER ',';" From 5301dfe963130733f5e6f0b036b143795cad4fe5 Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 3 Apr 2015 22:48:38 -0700 Subject: [PATCH 04/22] fix the recoverable error logic to prevent crash --- pipeline.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipeline.go b/pipeline.go index 45ed981..791097b 100644 --- a/pipeline.go +++ b/pipeline.go @@ -81,7 +81,9 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if err != nil { if p.isRecoverableError(err) { + l4g.Info("recoverable error, %s", err) consecutiveErrorAttempts++ + continue } else { l4g.Critical("GetRecords ERROR: %v\n", err) os.Exit(1) From 02a9d5d173c331972d25de314e27a87b5f5572d4 Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 3 Apr 2015 23:05:49 -0700 Subject: [PATCH 05/22] use log4go output --- pipeline.go | 6 +++--- redshift_basic_emitter.go | 8 +++++--- redshift_manifest_emitter.go | 14 +++++++++----- s3_emitter.go | 6 +++--- s3_manifest_emitter.go | 7 +++---- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/pipeline.go b/pipeline.go index 791097b..1da2ad0 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,7 +1,6 @@ package connector import ( - "log" "math" "os" "time" @@ -63,7 +62,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardInfo, err := ksis.GetShardIterator(args) if err != nil { - log.Fatalf("GetShardIterator ERROR: %v\n", err) + l4g.Critical("GetShardIterator ERROR: %v", err) + os.Exit(1) } shardIterator := shardInfo.ShardIterator @@ -108,7 +108,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { - log.Printf("NextShardIterator ERROR: %v\n", err) + l4g.Error("NextShardIterator ERROR: %v", err) break } else { time.Sleep(5 * time.Second) diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index d1d9b6e..8b17489 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -4,10 +4,10 @@ import ( "bytes" "database/sql" "fmt" - "log" "os" // Postgres package is used when sql.Open is called + l4g "github.com/ezoic/sol/log4go" _ "github.com/lib/pq" ) @@ -31,13 +31,15 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) } _, err = db.Exec(e.copyStatement(s3File)) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) } fmt.Printf("Redshift load completed.\n") diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index 36f0137..4780e0a 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -5,13 +5,13 @@ import ( "database/sql" "encoding/json" "fmt" - "log" "os" "strings" "time" "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" + l4g "github.com/ezoic/sol/log4go" _ "github.com/lib/pq" ) @@ -35,7 +35,9 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) + } // Aggregate file paths as strings @@ -55,7 +57,8 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(c) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) } // Insert file paths into File Names table @@ -63,10 +66,11 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(i) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) } - log.Printf("[%v] copied to Redshift", manifestFileName) + l4g.Info("[%v] copied to Redshift", manifestFileName) db.Close() } diff --git a/s3_emitter.go b/s3_emitter.go index 69a29c6..4b8339f 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -3,11 +3,11 @@ package connector import ( "bytes" "fmt" - "log" "time" "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" + l4g "github.com/ezoic/sol/log4go" ) // S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3. @@ -44,8 +44,8 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) { err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) if err != nil { - log.Printf("S3Put ERROR: %v\n", err) + l4g.Error("S3Put ERROR: %v", err) } else { - log.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket) + l4g.Info("[%v] records emitted to [%s]", b.NumRecordsInBuffer(), e.S3Bucket) } } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 8918da8..63e71d0 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -1,9 +1,8 @@ package connector import ( - "log" - "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/sol/log4go" ) // An implementation of Emitter that puts event data on S3 file, and then puts the @@ -30,8 +29,8 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { _, err := e.Ksis.PutRecord(args) if err != nil { - log.Printf("PutRecord ERROR: %v", err) + l4g.Error("PutRecord ERROR: %v", err) } else { - log.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) + l4g.Info("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) } } From b89c69d86b33b2465e8cde60d1244c4b038a0847 Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 3 Apr 2015 23:09:33 -0700 Subject: [PATCH 06/22] more log4go additions --- redshift_basic_emitter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 8b17489..bf44741 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -42,7 +42,7 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { os.Exit(1) } - fmt.Printf("Redshift load completed.\n") + l4g.Debug("Redshift load completed.\n") db.Close() } From d746c2529a455bc50510fac00b67336588d4a30b Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 3 Apr 2015 23:14:08 -0700 Subject: [PATCH 07/22] more log4go additions --- kinesis.go | 8 ++++---- redshift_manifest_emitter.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/kinesis.go b/kinesis.go index 1d66ca7..5475189 100644 --- a/kinesis.go +++ b/kinesis.go @@ -1,10 +1,10 @@ package connector import ( - "fmt" "time" "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/sol/log4go" ) // CreateStream creates a new Kinesis stream (uses existing stream if exists) and @@ -14,7 +14,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { err := k.CreateStream(streamName, shardCount) if err != nil { - fmt.Printf("CreateStream ERROR: %v\n", err) + l4g.Error("CreateStream ERROR: %v", err) return } } @@ -27,7 +27,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { args.Add("StreamName", streamName) resp, _ = k.DescribeStream(args) streamStatus := resp.StreamDescription.StreamStatus - fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus) + l4g.Info("Stream [%v] is %v", streamName, streamStatus) if streamStatus != "ACTIVE" { time.Sleep(4 * time.Second) @@ -43,7 +43,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool { args := kinesis.NewArgs() resp, err := k.ListStreams(args) if err != nil { - fmt.Printf("ListStream ERROR: %v\n", err) + l4g.Error("ListStream ERROR: %v", err) return false } for _, s := range resp.StreamNames { diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index 4780e0a..d633a46 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -123,7 +123,7 @@ func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileN content := e.generateManifestFile(files) err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{}) if err != nil { - fmt.Printf("Error occured while uploding to S3: %v\n", err) + l4g.Error("Error occured while uploding to S3: %v", err) } } From 852cf2a7d2296bfa5dbeac1c0338a093f2ff8b6b Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 3 Apr 2015 23:17:19 -0700 Subject: [PATCH 08/22] more log4go additions --- redshift_basic_emitter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index bf44741..bc8c6ca 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -42,7 +42,7 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { os.Exit(1) } - l4g.Debug("Redshift load completed.\n") + l4g.Debug("Redshift load completed.") db.Close() } From 18163c1599ded007969911fae51c32bfa23987b4 Mon Sep 17 00:00:00 2001 From: dan Date: Sat, 4 Apr 2015 10:24:23 -0700 Subject: [PATCH 09/22] change the l4g.Critical, os.Exit combo to log.Fatal, because l4g defers output so the critical logs are not being seen --- pipeline.go | 8 +++----- redshift_basic_emitter.go | 7 +++---- redshift_manifest_emitter.go | 11 ++++------- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/pipeline.go b/pipeline.go index 1da2ad0..25acdde 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,8 +1,8 @@ package connector import ( + "log" "math" - "os" "time" "github.com/ezoic/go-kinesis" @@ -62,8 +62,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardInfo, err := ksis.GetShardIterator(args) if err != nil { - l4g.Critical("GetShardIterator ERROR: %v", err) - os.Exit(1) + log.Fatalf("GetShardIterator ERROR: %v\n", err) } shardIterator := shardInfo.ShardIterator @@ -85,8 +84,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { consecutiveErrorAttempts++ continue } else { - l4g.Critical("GetRecords ERROR: %v\n", err) - os.Exit(1) + log.Fatalf("GetRecords ERROR: %v\n", err) } } else { consecutiveErrorAttempts = 0 diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index bc8c6ca..3d6f37f 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -4,6 +4,7 @@ import ( "bytes" "database/sql" "fmt" + "log" "os" // Postgres package is used when sql.Open is called @@ -31,15 +32,13 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - l4g.Critical(err) - os.Exit(1) + log.Fatal(err) } _, err = db.Exec(e.copyStatement(s3File)) if err != nil { - l4g.Critical(err) - os.Exit(1) + log.Fatal(err) } l4g.Debug("Redshift load completed.") diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index d633a46..d04c65e 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "fmt" + "log" "os" "strings" "time" @@ -35,9 +36,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - l4g.Critical(err) - os.Exit(1) - + log.Fatal(err) } // Aggregate file paths as strings @@ -57,8 +56,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(c) if err != nil { - l4g.Critical(err) - os.Exit(1) + log.Fatal(err) } // Insert file paths into File Names table @@ -66,8 +64,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(i) if err != nil { - l4g.Critical(err) - os.Exit(1) + log.Fatal(err) } l4g.Info("[%v] copied to Redshift", manifestFileName) From e3efa383c1a90a04f4f1499e2d006a2f0aeec930 Mon Sep 17 00:00:00 2001 From: dan Date: Mon, 6 Apr 2015 11:38:20 -0700 Subject: [PATCH 10/22] change dependency on log4go to different folder --- kinesis.go | 2 +- pipeline.go | 2 +- redshift_basic_emitter.go | 2 +- redshift_manifest_emitter.go | 2 +- s3_emitter.go | 2 +- s3_manifest_emitter.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kinesis.go b/kinesis.go index 5475189..c83d466 100644 --- a/kinesis.go +++ b/kinesis.go @@ -4,7 +4,7 @@ import ( "time" "github.com/ezoic/go-kinesis" - l4g "github.com/ezoic/sol/log4go" + l4g "github.com/ezoic/log4go" ) // CreateStream creates a new Kinesis stream (uses existing stream if exists) and diff --git a/pipeline.go b/pipeline.go index 25acdde..4b4e809 100644 --- a/pipeline.go +++ b/pipeline.go @@ -6,7 +6,7 @@ import ( "time" "github.com/ezoic/go-kinesis" - l4g "github.com/ezoic/sol/log4go" + l4g "github.com/ezoic/log4go" ) // Pipeline is used as a record processor to configure a pipline. diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 3d6f37f..2026893 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -8,7 +8,7 @@ import ( "os" // Postgres package is used when sql.Open is called - l4g "github.com/ezoic/sol/log4go" + l4g "github.com/ezoic/log4go" _ "github.com/lib/pq" ) diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index d04c65e..b2040f4 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -12,7 +12,7 @@ import ( "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" - l4g "github.com/ezoic/sol/log4go" + l4g "github.com/ezoic/log4go" _ "github.com/lib/pq" ) diff --git a/s3_emitter.go b/s3_emitter.go index 4b8339f..f21c8e0 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -7,7 +7,7 @@ import ( "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" - l4g "github.com/ezoic/sol/log4go" + l4g "github.com/ezoic/log4go" ) // S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3. diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 63e71d0..fa88e23 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -2,7 +2,7 @@ package connector import ( "github.com/ezoic/go-kinesis" - l4g "github.com/ezoic/sol/log4go" + l4g "github.com/ezoic/log4go" ) // An implementation of Emitter that puts event data on S3 file, and then puts the From 9ed761edc6e03b6f2bba89676ed2565e90884885 Mon Sep 17 00:00:00 2001 From: dan Date: Mon, 6 Apr 2015 14:51:46 -0700 Subject: [PATCH 11/22] re-use the database connection to redshift --- redshift_basic_emitter.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 2026893..6c54dde 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -20,7 +20,9 @@ type RedshiftBasicEmtitter struct { Format string Jsonpaths string S3Bucket string + S3Prefix string TableName string + Db *sql.DB } // Emit is invoked when the buffer is full. This method leverages the S3Emitter and @@ -29,20 +31,14 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) - db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) - if err != nil { - log.Fatal(err) - } - - _, err = db.Exec(e.copyStatement(s3File)) + _, err := e.Db.Exec(e.copyStatement(s3File)) if err != nil { log.Fatal(err) } l4g.Debug("Redshift load completed.") - db.Close() } // Creates the SQL copy statement issued to Redshift cluster. @@ -61,5 +57,6 @@ func (e RedshiftBasicEmtitter) copyStatement(s3File string) string { b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) } b.WriteString(";") + l4g.Debug(b.String()) return b.String() } From 6f8ff3f11d5ed4aec8ca0c590703229fa1e656c7 Mon Sep 17 00:00:00 2001 From: dan Date: Mon, 6 Apr 2015 14:52:04 -0700 Subject: [PATCH 12/22] use a prefix for files generated on s3 --- s3_emitter.go | 7 ++++++- s3_emitter_test.go | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/s3_emitter.go b/s3_emitter.go index f21c8e0..43c08f5 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -18,13 +18,18 @@ import ( // dash. This struct requires the configuration of an S3 bucket and endpoint. type S3Emitter struct { S3Bucket string + S3Prefix string } // S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current // UTC date (YYYY-MM-DD) is base of the path to logically group days of batches. func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { date := time.Now().UTC().Format("2006/01/02") - return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) + if e.S3Prefix == "" { + return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) + } else { + return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq) + } } // Emit is invoked when the buffer is full. This method emits the set of filtered records. diff --git a/s3_emitter_test.go b/s3_emitter_test.go index 1307dd0..5c8e9b2 100644 --- a/s3_emitter_test.go +++ b/s3_emitter_test.go @@ -8,12 +8,22 @@ import ( func TestS3FileName(t *testing.T) { d := time.Now().UTC().Format("2006/01/02") - e := S3Emitter{} + e := S3Emitter{S3Bucket: "bucket", S3Prefix: "prefix"} - expected := fmt.Sprintf("%v/a-b", d) + expected := fmt.Sprintf("prefix/%v/a-b", d) result := e.S3FileName("a", "b") if result != expected { t.Errorf("S3FileName() = %v want %v", result, expected) } + + e.S3Prefix = "" + + expected = fmt.Sprintf("%v/a-b", d) + result = e.S3FileName("a", "b") + + if result != expected { + t.Errorf("S3FileName() = %v want %v", result, expected) + } + } From f1af849e810d6773ec53b4c83f5cbfd7afa72985 Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 7 Apr 2015 08:53:06 -0700 Subject: [PATCH 13/22] handle recoverable error properly --- pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipeline.go b/pipeline.go index 4b4e809..09f6ac2 100644 --- a/pipeline.go +++ b/pipeline.go @@ -28,7 +28,7 @@ func (p Pipeline) isRecoverableError(err error) bool { r := false cErr, ok := err.(*kinesis.Error) - if ok && cErr.Code == "ProvisionedThroughputExceeded" { + if ok && cErr.Code == "ProvisionedThroughputExceededException" { r = true } From 0dce2a6045f25916095744730fb85d46648408cb Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 7 Apr 2015 10:42:07 -0700 Subject: [PATCH 14/22] debug for errors that are killing the pipeline --- pipeline.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pipeline.go b/pipeline.go index 09f6ac2..78344f8 100644 --- a/pipeline.go +++ b/pipeline.go @@ -3,6 +3,7 @@ package connector import ( "log" "math" + "reflect" "time" "github.com/ezoic/go-kinesis" @@ -23,12 +24,18 @@ type Pipeline struct { Transformer Transformer } +var pipelineRecoverableErrorCodes = map[string]bool{ + "ProvisionedThroughputExceededException": true, +} + // this determines whether the error is recoverable func (p Pipeline) isRecoverableError(err error) bool { r := false + l4g.Debug("isRecoverableError, type %s, value (+%v)", reflect.TypeOf(err).Name(), err) + cErr, ok := err.(*kinesis.Error) - if ok && cErr.Code == "ProvisionedThroughputExceededException" { + if ok && pipelineRecoverableErrorCodes[cErr.Code] == true { r = true } From 2ff748a2d4c1dbcf4ccf727d09e3c1c4dbdedcc0 Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 7 Apr 2015 11:19:10 -0700 Subject: [PATCH 15/22] checkpoint after filtered messages in the pipeline checkpoint after filtered messages in the pipeline checkpoint after filtered messages in the pipeline --- pipeline.go | 19 ++++++++++++------- record_buffer.go | 6 ++++-- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/pipeline.go b/pipeline.go index 78344f8..f2feeb4 100644 --- a/pipeline.go +++ b/pipeline.go @@ -16,12 +16,13 @@ import ( // interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model. // Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter. type Pipeline struct { - Buffer Buffer - Checkpoint Checkpoint - Emitter Emitter - Filter Filter - StreamName string - Transformer Transformer + Buffer Buffer + Checkpoint Checkpoint + Emitter Emitter + Filter Filter + StreamName string + Transformer Transformer + CheckpointFilteredRecords bool } var pipelineRecoverableErrorCodes = map[string]bool{ @@ -110,6 +111,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if p.Filter.KeepRecord(r) { p.Buffer.ProcessRecord(r, v.SequenceNumber) + } else if p.CheckpointFilteredRecords { + p.Buffer.ProcessRecord(nil, v.SequenceNumber) } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { @@ -120,7 +123,9 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } if p.Buffer.ShouldFlush() { - p.Emitter.Emit(p.Buffer, p.Transformer) + if p.Buffer.NumRecordsInBuffer() > 0 { + p.Emitter.Emit(p.Buffer, p.Transformer) + } p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) p.Buffer.Flush() } diff --git a/record_buffer.go b/record_buffer.go index a2a7dfd..8f01f58 100644 --- a/record_buffer.go +++ b/record_buffer.go @@ -20,7 +20,9 @@ func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) b.lastSequenceNumber = sequenceNumber if !b.sequenceExists(sequenceNumber) { - b.recordsInBuffer = append(b.recordsInBuffer, record) + if record != nil { + b.recordsInBuffer = append(b.recordsInBuffer, record) + } b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber) } } @@ -32,7 +34,7 @@ func (b *RecordBuffer) Records() []interface{} { // NumRecordsInBuffer returns the number of messages in the buffer. func (b RecordBuffer) NumRecordsInBuffer() int { - return len(b.sequencesInBuffer) + return len(b.recordsInBuffer) } // Flush empties the buffer and resets the sequence counter. From 969ba1882443dad8b3b19a5dea223284d7190d90 Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 7 Apr 2015 13:42:50 -0700 Subject: [PATCH 16/22] if there are no records available, keep going until the nextiterator returns nothing --- pipeline.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pipeline.go b/pipeline.go index f2feeb4..8413125 100644 --- a/pipeline.go +++ b/pipeline.go @@ -33,7 +33,7 @@ var pipelineRecoverableErrorCodes = map[string]bool{ func (p Pipeline) isRecoverableError(err error) bool { r := false - l4g.Debug("isRecoverableError, type %s, value (+%v)", reflect.TypeOf(err).Name(), err) + log.Printf("isRecoverableError, type %s, value (+%v)\n", reflect.TypeOf(err).String(), err) cErr, ok := err.(*kinesis.Error) if ok && pipelineRecoverableErrorCodes[cErr.Code] == true { @@ -48,8 +48,11 @@ func (p Pipeline) handleAwsWaitTimeExp(attempts int) { //http://docs.aws.amazon.com/general/latest/gr/api-retries.html // wait up to 5 minutes based on the aws exponential backoff algorithm - waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond - time.Sleep(waitTime) + if attempts > 0 { + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String()) + time.Sleep(waitTime) + } } @@ -119,7 +122,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { l4g.Error("NextShardIterator ERROR: %v", err) break } else { - time.Sleep(5 * time.Second) + //time.Sleep(5 * time.Second) } if p.Buffer.ShouldFlush() { From a028ee862f96fb314f141a30dad03d0ed63965d6 Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 7 Apr 2015 14:40:34 -0700 Subject: [PATCH 17/22] add more recoverable errors --- pipeline.go | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/pipeline.go b/pipeline.go index 8413125..b3cc44b 100644 --- a/pipeline.go +++ b/pipeline.go @@ -3,6 +3,7 @@ package connector import ( "log" "math" + "net" "reflect" "time" @@ -25,8 +26,34 @@ type Pipeline struct { CheckpointFilteredRecords bool } -var pipelineRecoverableErrorCodes = map[string]bool{ - "ProvisionedThroughputExceededException": true, +type pipelineIsRecoverableErrorFunc func(error) bool + +func pipelineKinesisIsRecoverableError(err error) bool { + recoverableErrorCodes := map[string]bool{ + "ProvisionedThroughputExceededException": true, + } + r := false + cErr, ok := err.(*kinesis.Error) + if ok && recoverableErrorCodes[cErr.Code] == true { + r = true + } + return r +} + +func pipelineNetIsRecoverableError(err error) bool { + recoverableErrors := map[string]bool{ + "connection reset by peer": true, + } + r := false + cErr, ok := err.(*net.OpError) + if ok && recoverableErrors[cErr.Err.Error()] == true { + r = true + } + return r +} + +var pipelineIsRecoverableErrors = []pipelineIsRecoverableErrorFunc{ + pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError, } // this determines whether the error is recoverable @@ -35,9 +62,11 @@ func (p Pipeline) isRecoverableError(err error) bool { log.Printf("isRecoverableError, type %s, value (+%v)\n", reflect.TypeOf(err).String(), err) - cErr, ok := err.(*kinesis.Error) - if ok && pipelineRecoverableErrorCodes[cErr.Code] == true { - r = true + for _, errF := range pipelineIsRecoverableErrors { + r = errF(err) + if r { + break + } } return r From 46e5d628849d7e552845b35599aaf552706a6170 Mon Sep 17 00:00:00 2001 From: dan Date: Wed, 8 Apr 2015 09:54:20 -0700 Subject: [PATCH 18/22] handle more kinesis recoverable errors --- pipeline.go | 6 +++++- pipeline_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 pipeline_test.go diff --git a/pipeline.go b/pipeline.go index b3cc44b..dd299ec 100644 --- a/pipeline.go +++ b/pipeline.go @@ -31,6 +31,10 @@ type pipelineIsRecoverableErrorFunc func(error) bool func pipelineKinesisIsRecoverableError(err error) bool { recoverableErrorCodes := map[string]bool{ "ProvisionedThroughputExceededException": true, + "InternalFailure": true, + "Throttling": true, + "ServiceUnavailable": true, + //"ExpiredIteratorException": true, } r := false cErr, ok := err.(*kinesis.Error) @@ -151,7 +155,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { l4g.Error("NextShardIterator ERROR: %v", err) break } else { - //time.Sleep(5 * time.Second) + time.Sleep(5 * time.Second) } if p.Buffer.ShouldFlush() { diff --git a/pipeline_test.go b/pipeline_test.go new file mode 100644 index 0000000..3629f05 --- /dev/null +++ b/pipeline_test.go @@ -0,0 +1,41 @@ +package connector + +import ( + "fmt" + "net" + "testing" + + "github.com/ezoic/go-kinesis" +) + +func Test_isRecoverableError(t *testing.T) { + + testCases := []struct { + err error + isRecoverable bool + }{ + {err: &kinesis.Error{Code: "ProvisionedThroughputExceededException"}, isRecoverable: true}, + {err: &kinesis.Error{Code: "Throttling"}, isRecoverable: true}, + {err: &kinesis.Error{Code: "ServiceUnavailable"}, isRecoverable: true}, + {err: &kinesis.Error{Code: "ExpiredIteratorException"}, isRecoverable: false}, + {err: &net.OpError{Err: fmt.Errorf("connection reset by peer")}, isRecoverable: true}, + {err: &net.OpError{Err: fmt.Errorf("unexpected error")}, isRecoverable: false}, + {err: fmt.Errorf("an arbitrary error"), isRecoverable: false}, + + //"InternalFailure": true, + //"Throttling": true, + //"ServiceUnavailable": true, + ////"ExpiredIteratorException": true, + //{err: *kinesis.Error{Code:""}} + } + + for idx, tc := range testCases { + + p := Pipeline{} + isRecoverable := p.isRecoverableError(tc.err) + if isRecoverable != tc.isRecoverable { + t.Errorf("test case %d: isRecoverable expected %t, actual %t, for error %+v", idx, tc.isRecoverable, isRecoverable, tc.err) + } + + } +} From e5af96fb54c1e3179216225e5315e9efb09c74b9 Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 21 Apr 2015 15:14:24 -0700 Subject: [PATCH 19/22] handle more errors --- pipeline.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pipeline.go b/pipeline.go index dd299ec..3bbb73d 100644 --- a/pipeline.go +++ b/pipeline.go @@ -4,6 +4,7 @@ import ( "log" "math" "net" + "net/url" "reflect" "time" @@ -44,6 +45,15 @@ func pipelineKinesisIsRecoverableError(err error) bool { return r } +func pipelineUrlIsRecoverableError(err error) bool { + r := false + _, ok := err.(*url.Error) + if ok { + r = true + } + return r +} + func pipelineNetIsRecoverableError(err error) bool { recoverableErrors := map[string]bool{ "connection reset by peer": true, @@ -57,14 +67,14 @@ func pipelineNetIsRecoverableError(err error) bool { } var pipelineIsRecoverableErrors = []pipelineIsRecoverableErrorFunc{ - pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError, + pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError, pipelineUrlIsRecoverableError, } // this determines whether the error is recoverable func (p Pipeline) isRecoverableError(err error) bool { r := false - log.Printf("isRecoverableError, type %s, value (+%v)\n", reflect.TypeOf(err).String(), err) + log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err) for _, errF := range pipelineIsRecoverableErrors { r = errF(err) @@ -115,6 +125,10 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { for { + if consecutiveErrorAttempts > 50 { + log.Fatalln("Too many consecutive error attempts") + } + // handle the aws backoff stuff p.handleAwsWaitTimeExp(consecutiveErrorAttempts) From 2a285c52d52c47e7cef7bd91923b987ee24ecebb Mon Sep 17 00:00:00 2001 From: dan Date: Wed, 6 May 2015 08:44:14 -0700 Subject: [PATCH 20/22] add mysql checkpoint to kinesis-connectors --- mysql_checkpoint.go | 66 ++++++++++++++++++++++++++++++++++++++++ mysql_checkpoint_test.go | 59 +++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 mysql_checkpoint.go create mode 100644 mysql_checkpoint_test.go diff --git a/mysql_checkpoint.go b/mysql_checkpoint.go new file mode 100644 index 0000000..4bac5ba --- /dev/null +++ b/mysql_checkpoint.go @@ -0,0 +1,66 @@ +package connector + +import ( + "database/sql" + "fmt" + + l4g "github.com/ezoic/log4go" + _ "github.com/go-sql-driver/mysql" +) + +// MysqlCheckpoint implements the Checkpont interface. +// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress. +type MysqlCheckpoint struct { + AppName string + StreamName string + TableName string + Db *sql.DB + + sequenceNumber string +} + +// CheckpointExists determines if a checkpoint for a particular Shard exists. +// Typically used to determine whether we should start processing the shard with +// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). +func (c *MysqlCheckpoint) CheckpointExists(shardID string) bool { + + l4g.Finest("SELECT sequence_number FROM " + c.TableName + " WHERE checkpoint_key = ?") + + row := c.Db.QueryRow("SELECT sequence_number FROM "+c.TableName+" WHERE checkpoint_key = ?", c.key(shardID)) + var val string + err := row.Scan(&val) + if err == nil { + l4g.Finest("sequence:%s", val) + c.sequenceNumber = val + return true + } + + if err == sql.ErrNoRows { + return false + } + + // something bad happened, better blow up the process + panic(err) +} + +// SequenceNumber returns the current checkpoint stored for the specified shard. +func (c *MysqlCheckpoint) SequenceNumber() string { + return c.sequenceNumber +} + +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// Upon failover, record processing is resumed from this point. +func (c *MysqlCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { + + _, err := c.Db.Exec("INSERT INTO "+c.TableName+" (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", sequenceNumber, c.key(shardID), sequenceNumber) + if err != nil { + panic(err) + } + c.sequenceNumber = sequenceNumber + +} + +// key generates a unique mysql key for storage of Checkpoint. +func (c *MysqlCheckpoint) key(shardID string) string { + return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) +} diff --git a/mysql_checkpoint_test.go b/mysql_checkpoint_test.go new file mode 100644 index 0000000..7ef1906 --- /dev/null +++ b/mysql_checkpoint_test.go @@ -0,0 +1,59 @@ +package connector + +import ( + "database/sql" + "os" + "testing" +) + +func Test_MysqlKey(t *testing.T) { + k := "app:checkpoint:stream:shard" + c := MysqlCheckpoint{AppName: "app", StreamName: "stream"} + + r := c.key("shard") + + if r != k { + t.Errorf("key() = %v, want %v", k, r) + } +} + +func Test_MysqlCheckpointExists(t *testing.T) { + rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN")) + k := "app:checkpoint:stream:shard" + + _, err := rc.Exec("INSERT INTO KinesisConnector.TestCheckpoint (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", "fakeSeqNum", k, "fakeSeqNum") + if err != nil { + t.Fatalf("cannot insert checkpoint into db manually, %s", err) + } + c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc} + + r := c.CheckpointExists("shard") + + if r != true { + t.Errorf("CheckpointExists() = %v, want %v", false, r) + } + + rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k) +} + +func Test_MysqlSetCheckpoint(t *testing.T) { + k := "app:checkpoint:stream:shard" + + rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN")) + + c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc} + c.SetCheckpoint("shard", "fakeSeqNum") + + rslt := rc.QueryRow("SELECT sequence_number FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k) + var sequenceNumber string + err := rslt.Scan(&sequenceNumber) + if err != nil { + t.Fatalf("cannot scan row for checkpoint key, %s", err) + } + + if sequenceNumber != "fakeSeqNum" { + t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", sequenceNumber) + } + + rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k) +} From e42dd7f73d69c8a80d5bc457b795df2036dfd255 Mon Sep 17 00:00:00 2001 From: dan Date: Wed, 6 May 2015 09:14:00 -0700 Subject: [PATCH 21/22] move aws backoff to its own file, and use it for redshift basic emitter --- awsbackoff.go | 104 ++++++++++++++++++++++++++++++++++++++ pipeline.go | 80 +---------------------------- redshift_basic_emitter.go | 23 ++++++++- 3 files changed, 128 insertions(+), 79 deletions(-) create mode 100644 awsbackoff.go diff --git a/awsbackoff.go b/awsbackoff.go new file mode 100644 index 0000000..31fe87b --- /dev/null +++ b/awsbackoff.go @@ -0,0 +1,104 @@ +package connector + +import ( + "log" + "math" + "net" + "net/url" + "reflect" + "regexp" + "time" + + "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/log4go" + "github.com/lib/pq" +) + +type isRecoverableErrorFunc func(error) bool + +func kinesisIsRecoverableError(err error) bool { + recoverableErrorCodes := map[string]bool{ + "ProvisionedThroughputExceededException": true, + "InternalFailure": true, + "Throttling": true, + "ServiceUnavailable": true, + //"ExpiredIteratorException": true, + } + r := false + cErr, ok := err.(*kinesis.Error) + if ok && recoverableErrorCodes[cErr.Code] == true { + r = true + } + return r +} + +func urlIsRecoverableError(err error) bool { + r := false + _, ok := err.(*url.Error) + if ok { + r = true + } + return r +} + +func netIsRecoverableError(err error) bool { + recoverableErrors := map[string]bool{ + "connection reset by peer": true, + } + r := false + cErr, ok := err.(*net.OpError) + if ok && recoverableErrors[cErr.Err.Error()] == true { + r = true + } + return r +} + +var redshiftRecoverableErrors = []*regexp.Regexp{ + regexp.MustCompile("The specified S3 prefix '.*?' does not exist"), +} + +func redshiftIsRecoverableError(err error) bool { + r := false + if cErr, ok := err.(pq.Error); ok { + for _, re := range redshiftRecoverableErrors { + if re.MatchString(cErr.Message) { + r = true + break + } + } + } + return r +} + +var isRecoverableErrors = []isRecoverableErrorFunc{ + kinesisIsRecoverableError, netIsRecoverableError, urlIsRecoverableError, redshiftIsRecoverableError, +} + +// this determines whether the error is recoverable +func isRecoverableError(err error) bool { + r := false + + log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err) + + for _, errF := range isRecoverableErrors { + r = errF(err) + if r { + break + } + } + + return r +} + +// handle the aws exponential backoff +func handleAwsWaitTimeExp(attempts int) { + + //http://docs.aws.amazon.com/general/latest/gr/api-retries.html + // wait up to 5 minutes based on the aws exponential backoff algorithm + if attempts > 0 { + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String()) + time.Sleep(waitTime) + } + +} diff --git a/pipeline.go b/pipeline.go index 3bbb73d..7ff7199 100644 --- a/pipeline.go +++ b/pipeline.go @@ -2,10 +2,6 @@ package connector import ( "log" - "math" - "net" - "net/url" - "reflect" "time" "github.com/ezoic/go-kinesis" @@ -27,78 +23,6 @@ type Pipeline struct { CheckpointFilteredRecords bool } -type pipelineIsRecoverableErrorFunc func(error) bool - -func pipelineKinesisIsRecoverableError(err error) bool { - recoverableErrorCodes := map[string]bool{ - "ProvisionedThroughputExceededException": true, - "InternalFailure": true, - "Throttling": true, - "ServiceUnavailable": true, - //"ExpiredIteratorException": true, - } - r := false - cErr, ok := err.(*kinesis.Error) - if ok && recoverableErrorCodes[cErr.Code] == true { - r = true - } - return r -} - -func pipelineUrlIsRecoverableError(err error) bool { - r := false - _, ok := err.(*url.Error) - if ok { - r = true - } - return r -} - -func pipelineNetIsRecoverableError(err error) bool { - recoverableErrors := map[string]bool{ - "connection reset by peer": true, - } - r := false - cErr, ok := err.(*net.OpError) - if ok && recoverableErrors[cErr.Err.Error()] == true { - r = true - } - return r -} - -var pipelineIsRecoverableErrors = []pipelineIsRecoverableErrorFunc{ - pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError, pipelineUrlIsRecoverableError, -} - -// this determines whether the error is recoverable -func (p Pipeline) isRecoverableError(err error) bool { - r := false - - log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err) - - for _, errF := range pipelineIsRecoverableErrors { - r = errF(err) - if r { - break - } - } - - return r -} - -// handle the aws exponential backoff -func (p Pipeline) handleAwsWaitTimeExp(attempts int) { - - //http://docs.aws.amazon.com/general/latest/gr/api-retries.html - // wait up to 5 minutes based on the aws exponential backoff algorithm - if attempts > 0 { - waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond - l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String()) - time.Sleep(waitTime) - } - -} - // ProcessShard kicks off the process of a Kinesis Shard. // It is a long running process that will continue to read from the shard. func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { @@ -130,14 +54,14 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } // handle the aws backoff stuff - p.handleAwsWaitTimeExp(consecutiveErrorAttempts) + handleAwsWaitTimeExp(consecutiveErrorAttempts) args = kinesis.NewArgs() args.Add("ShardIterator", shardIterator) recordSet, err := ksis.GetRecords(args) if err != nil { - if p.isRecoverableError(err) { + if isRecoverableError(err) { l4g.Info("recoverable error, %s", err) consecutiveErrorAttempts++ continue diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 6c54dde..3544076 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -32,7 +32,28 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) - _, err := e.Db.Exec(e.copyStatement(s3File)) + stmt := e.copyStatement(s3File) + + var err error + for i := 0; i < 10; i++ { + + // handle aws backoff, this may be necessary if, for example, the + // s3 file has not appeared to the database yet + handleAwsWaitTimeExp(i) + + // load into the database + _, err := e.Db.Exec(stmt) + + // if the request succeeded, or its an unrecoverable error, break out of the loop + // because we are done + if err == nil || isRecoverableError(err) == false { + break + } + + // recoverable error, lets warn + l4g.Warn(err) + + } if err != nil { log.Fatal(err) From e6a0af7ffdcb4884b0ea8c9d2b1ca80f9cb335fa Mon Sep 17 00:00:00 2001 From: dan Date: Wed, 6 May 2015 09:18:15 -0700 Subject: [PATCH 22/22] move aws backoff to its own file, and use it for redshift basic emitter --- pipeline_test.go => awsbackoff_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) rename pipeline_test.go => awsbackoff_test.go (82%) diff --git a/pipeline_test.go b/awsbackoff_test.go similarity index 82% rename from pipeline_test.go rename to awsbackoff_test.go index 3629f05..d2c7cbd 100644 --- a/pipeline_test.go +++ b/awsbackoff_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/ezoic/go-kinesis" + "github.com/lib/pq" ) func Test_isRecoverableError(t *testing.T) { @@ -21,6 +22,8 @@ func Test_isRecoverableError(t *testing.T) { {err: &net.OpError{Err: fmt.Errorf("connection reset by peer")}, isRecoverable: true}, {err: &net.OpError{Err: fmt.Errorf("unexpected error")}, isRecoverable: false}, {err: fmt.Errorf("an arbitrary error"), isRecoverable: false}, + {err: pq.Error{Message: "The specified S3 prefix 'somefilethatismissing' does not exist"}, isRecoverable: true}, + {err: pq.Error{Message: "Some other pq error"}, isRecoverable: false}, //"InternalFailure": true, //"Throttling": true, @@ -31,8 +34,7 @@ func Test_isRecoverableError(t *testing.T) { for idx, tc := range testCases { - p := Pipeline{} - isRecoverable := p.isRecoverableError(tc.err) + isRecoverable := isRecoverableError(tc.err) if isRecoverable != tc.isRecoverable { t.Errorf("test case %d: isRecoverable expected %t, actual %t, for error %+v", idx, tc.isRecoverable, isRecoverable, tc.err) }