diff --git a/awsbackoff.go b/awsbackoff.go new file mode 100644 index 0000000..31fe87b --- /dev/null +++ b/awsbackoff.go @@ -0,0 +1,104 @@ +package connector + +import ( + "log" + "math" + "net" + "net/url" + "reflect" + "regexp" + "time" + + "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/log4go" + "github.com/lib/pq" +) + +type isRecoverableErrorFunc func(error) bool + +func kinesisIsRecoverableError(err error) bool { + recoverableErrorCodes := map[string]bool{ + "ProvisionedThroughputExceededException": true, + "InternalFailure": true, + "Throttling": true, + "ServiceUnavailable": true, + //"ExpiredIteratorException": true, + } + r := false + cErr, ok := err.(*kinesis.Error) + if ok && recoverableErrorCodes[cErr.Code] == true { + r = true + } + return r +} + +func urlIsRecoverableError(err error) bool { + r := false + _, ok := err.(*url.Error) + if ok { + r = true + } + return r +} + +func netIsRecoverableError(err error) bool { + recoverableErrors := map[string]bool{ + "connection reset by peer": true, + } + r := false + cErr, ok := err.(*net.OpError) + if ok && recoverableErrors[cErr.Err.Error()] == true { + r = true + } + return r +} + +var redshiftRecoverableErrors = []*regexp.Regexp{ + regexp.MustCompile("The specified S3 prefix '.*?' does not exist"), +} + +func redshiftIsRecoverableError(err error) bool { + r := false + if cErr, ok := err.(pq.Error); ok { + for _, re := range redshiftRecoverableErrors { + if re.MatchString(cErr.Message) { + r = true + break + } + } + } + return r +} + +var isRecoverableErrors = []isRecoverableErrorFunc{ + kinesisIsRecoverableError, netIsRecoverableError, urlIsRecoverableError, redshiftIsRecoverableError, +} + +// this determines whether the error is recoverable +func isRecoverableError(err error) bool { + r := false + + log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err) + + for _, errF := range isRecoverableErrors { + r = errF(err) + if r { + break + } + } + + return r +} + +// handle the aws exponential backoff +func handleAwsWaitTimeExp(attempts int) { + + //http://docs.aws.amazon.com/general/latest/gr/api-retries.html + // wait up to 5 minutes based on the aws exponential backoff algorithm + if attempts > 0 { + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String()) + time.Sleep(waitTime) + } + +} diff --git a/awsbackoff_test.go b/awsbackoff_test.go new file mode 100644 index 0000000..d2c7cbd --- /dev/null +++ b/awsbackoff_test.go @@ -0,0 +1,43 @@ +package connector + +import ( + "fmt" + "net" + "testing" + + "github.com/ezoic/go-kinesis" + "github.com/lib/pq" +) + +func Test_isRecoverableError(t *testing.T) { + + testCases := []struct { + err error + isRecoverable bool + }{ + {err: &kinesis.Error{Code: "ProvisionedThroughputExceededException"}, isRecoverable: true}, + {err: &kinesis.Error{Code: "Throttling"}, isRecoverable: true}, + {err: &kinesis.Error{Code: "ServiceUnavailable"}, isRecoverable: true}, + {err: &kinesis.Error{Code: "ExpiredIteratorException"}, isRecoverable: false}, + {err: &net.OpError{Err: fmt.Errorf("connection reset by peer")}, isRecoverable: true}, + {err: &net.OpError{Err: fmt.Errorf("unexpected error")}, isRecoverable: false}, + {err: fmt.Errorf("an arbitrary error"), isRecoverable: false}, + {err: pq.Error{Message: "The specified S3 prefix 'somefilethatismissing' does not exist"}, isRecoverable: true}, + {err: pq.Error{Message: "Some other pq error"}, isRecoverable: false}, + + //"InternalFailure": true, + //"Throttling": true, + //"ServiceUnavailable": true, + ////"ExpiredIteratorException": true, + //{err: *kinesis.Error{Code:""}} + } + + for idx, tc := range testCases { + + isRecoverable := isRecoverableError(tc.err) + if isRecoverable != tc.isRecoverable { + t.Errorf("test case %d: isRecoverable expected %t, actual %t, for error %+v", idx, tc.isRecoverable, isRecoverable, tc.err) + } + + } +} diff --git a/kinesis.go b/kinesis.go index 77f0644..c83d466 100644 --- a/kinesis.go +++ b/kinesis.go @@ -1,10 +1,10 @@ package connector import ( - "fmt" "time" - "github.com/sendgridlabs/go-kinesis" + "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/log4go" ) // CreateStream creates a new Kinesis stream (uses existing stream if exists) and @@ -14,7 +14,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { err := k.CreateStream(streamName, shardCount) if err != nil { - fmt.Printf("CreateStream ERROR: %v\n", err) + l4g.Error("CreateStream ERROR: %v", err) return } } @@ -27,7 +27,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { args.Add("StreamName", streamName) resp, _ = k.DescribeStream(args) streamStatus := resp.StreamDescription.StreamStatus - fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus) + l4g.Info("Stream [%v] is %v", streamName, streamStatus) if streamStatus != "ACTIVE" { time.Sleep(4 * time.Second) @@ -43,7 +43,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool { args := kinesis.NewArgs() resp, err := k.ListStreams(args) if err != nil { - fmt.Printf("ListStream ERROR: %v\n", err) + l4g.Error("ListStream ERROR: %v", err) return false } for _, s := range resp.StreamNames { diff --git a/mysql_checkpoint.go b/mysql_checkpoint.go new file mode 100644 index 0000000..4bac5ba --- /dev/null +++ b/mysql_checkpoint.go @@ -0,0 +1,66 @@ +package connector + +import ( + "database/sql" + "fmt" + + l4g "github.com/ezoic/log4go" + _ "github.com/go-sql-driver/mysql" +) + +// MysqlCheckpoint implements the Checkpont interface. +// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress. +type MysqlCheckpoint struct { + AppName string + StreamName string + TableName string + Db *sql.DB + + sequenceNumber string +} + +// CheckpointExists determines if a checkpoint for a particular Shard exists. +// Typically used to determine whether we should start processing the shard with +// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). +func (c *MysqlCheckpoint) CheckpointExists(shardID string) bool { + + l4g.Finest("SELECT sequence_number FROM " + c.TableName + " WHERE checkpoint_key = ?") + + row := c.Db.QueryRow("SELECT sequence_number FROM "+c.TableName+" WHERE checkpoint_key = ?", c.key(shardID)) + var val string + err := row.Scan(&val) + if err == nil { + l4g.Finest("sequence:%s", val) + c.sequenceNumber = val + return true + } + + if err == sql.ErrNoRows { + return false + } + + // something bad happened, better blow up the process + panic(err) +} + +// SequenceNumber returns the current checkpoint stored for the specified shard. +func (c *MysqlCheckpoint) SequenceNumber() string { + return c.sequenceNumber +} + +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// Upon failover, record processing is resumed from this point. +func (c *MysqlCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { + + _, err := c.Db.Exec("INSERT INTO "+c.TableName+" (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", sequenceNumber, c.key(shardID), sequenceNumber) + if err != nil { + panic(err) + } + c.sequenceNumber = sequenceNumber + +} + +// key generates a unique mysql key for storage of Checkpoint. +func (c *MysqlCheckpoint) key(shardID string) string { + return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) +} diff --git a/mysql_checkpoint_test.go b/mysql_checkpoint_test.go new file mode 100644 index 0000000..7ef1906 --- /dev/null +++ b/mysql_checkpoint_test.go @@ -0,0 +1,59 @@ +package connector + +import ( + "database/sql" + "os" + "testing" +) + +func Test_MysqlKey(t *testing.T) { + k := "app:checkpoint:stream:shard" + c := MysqlCheckpoint{AppName: "app", StreamName: "stream"} + + r := c.key("shard") + + if r != k { + t.Errorf("key() = %v, want %v", k, r) + } +} + +func Test_MysqlCheckpointExists(t *testing.T) { + rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN")) + k := "app:checkpoint:stream:shard" + + _, err := rc.Exec("INSERT INTO KinesisConnector.TestCheckpoint (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", "fakeSeqNum", k, "fakeSeqNum") + if err != nil { + t.Fatalf("cannot insert checkpoint into db manually, %s", err) + } + c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc} + + r := c.CheckpointExists("shard") + + if r != true { + t.Errorf("CheckpointExists() = %v, want %v", false, r) + } + + rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k) +} + +func Test_MysqlSetCheckpoint(t *testing.T) { + k := "app:checkpoint:stream:shard" + + rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN")) + + c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc} + c.SetCheckpoint("shard", "fakeSeqNum") + + rslt := rc.QueryRow("SELECT sequence_number FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k) + var sequenceNumber string + err := rslt.Scan(&sequenceNumber) + if err != nil { + t.Fatalf("cannot scan row for checkpoint key, %s", err) + } + + if sequenceNumber != "fakeSeqNum" { + t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", sequenceNumber) + } + + rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k) +} diff --git a/pipeline.go b/pipeline.go index d0d7911..814ebdd 100644 --- a/pipeline.go +++ b/pipeline.go @@ -3,7 +3,8 @@ package connector import ( "time" - "github.com/sendgridlabs/go-kinesis" + "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/log4go" ) // Pipeline is used as a record processor to configure a pipline. @@ -12,13 +13,14 @@ import ( // interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model. // Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter. type Pipeline struct { - Buffer Buffer - Checkpoint Checkpoint - Emitter Emitter - Filter Filter + Buffer Buffer + Checkpoint Checkpoint + Emitter Emitter + Filter Filter Logger Logger - StreamName string - Transformer Transformer + StreamName string + Transformer Transformer + CheckpointFilteredRecords bool } // ProcessShard kicks off the process of a Kinesis Shard. @@ -43,13 +45,31 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardIterator := shardInfo.ShardIterator + consecutiveErrorAttempts := 0 + for { + + if consecutiveErrorAttempts > 50 { + log.Fatalln("Too many consecutive error attempts") + } + + // handle the aws backoff stuff + handleAwsWaitTimeExp(consecutiveErrorAttempts) + args = kinesis.NewArgs() args.Add("ShardIterator", shardIterator) recordSet, err := ksis.GetRecords(args) if err != nil { - p.Logger.Fatalf("GetRecords ERROR: %v\n", err) + if isRecoverableError(err) { + p.Logger.Infof("recoverable error, %s", err) + consecutiveErrorAttempts++ + continue + } else { + p.Logger.Fatalf("GetRecords ERROR: %v\n", err) + } + } else { + consecutiveErrorAttempts = 0 } if len(recordSet.Records) > 0 { @@ -65,6 +85,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if p.Filter.KeepRecord(r) { p.Buffer.ProcessRecord(r, v.SequenceNumber) + } else if p.CheckpointFilteredRecords { + p.Buffer.ProcessRecord(nil, v.SequenceNumber) } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { @@ -75,7 +97,9 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } if p.Buffer.ShouldFlush() { - p.Emitter.Emit(p.Buffer, p.Transformer) + if p.Buffer.NumRecordsInBuffer() > 0 { + p.Emitter.Emit(p.Buffer, p.Transformer) + } p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) p.Buffer.Flush() } diff --git a/record_buffer.go b/record_buffer.go index a2a7dfd..8f01f58 100644 --- a/record_buffer.go +++ b/record_buffer.go @@ -20,7 +20,9 @@ func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) b.lastSequenceNumber = sequenceNumber if !b.sequenceExists(sequenceNumber) { - b.recordsInBuffer = append(b.recordsInBuffer, record) + if record != nil { + b.recordsInBuffer = append(b.recordsInBuffer, record) + } b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber) } } @@ -32,7 +34,7 @@ func (b *RecordBuffer) Records() []interface{} { // NumRecordsInBuffer returns the number of messages in the buffer. func (b RecordBuffer) NumRecordsInBuffer() int { - return len(b.sequencesInBuffer) + return len(b.recordsInBuffer) } // Flush empties the buffer and resets the sequence counter. diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index e61c3c9..5c51a7b 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -7,6 +7,7 @@ import ( "os" // Postgres package is used when sql.Open is called + l4g "github.com/ezoic/log4go" _ "github.com/lib/pq" ) @@ -19,7 +20,9 @@ type RedshiftBasicEmitter struct { Jsonpaths string Logger Logger S3Bucket string + S3Prefix string TableName string + Db *sql.DB } // Emit is invoked when the buffer is full. This method leverages the S3Emitter and @@ -28,27 +31,42 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) - db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) - if err != nil { - e.Logger.Fatalf("sql.Open ERROR: %v\n", err) + stmt := e.copyStatement(s3File) + + var err error + for i := 0; i < 10; i++ { + + // handle aws backoff, this may be necessary if, for example, the + // s3 file has not appeared to the database yet + handleAwsWaitTimeExp(i) + + // load into the database + _, err := e.Db.Exec(stmt) + + // if the request succeeded, or its an unrecoverable error, break out of the loop + // because we are done + if err == nil || isRecoverableError(err) == false { + break + } + + // recoverable error, lets warn + l4g.Warn(err) + } - _, err = db.Exec(e.copyStatement(s3File)) - if err != nil { e.Logger.Fatalf("db.Exec ERROR: %v\n", err) } e.Logger.Printf("Redshift load completed.\n") - db.Close() } // Creates the SQL copy statement issued to Redshift cluster. func (e RedshiftBasicEmitter) copyStatement(s3File string) string { b := new(bytes.Buffer) b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) - b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File)) + b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File)) b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY"))) b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY"))) switch e.Format { @@ -60,5 +78,6 @@ func (e RedshiftBasicEmitter) copyStatement(s3File string) string { b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) } b.WriteString(";") + l4g.Debug(b.String()) return b.String() } diff --git a/redshift_basic_emitter_test.go b/redshift_basic_emitter_test.go index f5f790d..4689be6 100644 --- a/redshift_basic_emitter_test.go +++ b/redshift_basic_emitter_test.go @@ -10,7 +10,7 @@ func TestCopyStatement(t *testing.T) { S3Bucket: "test_bucket", TableName: "test_table", } - f := e.copyStatement("/test.txt") + f := e.copyStatement("test.txt") copyStatement := "COPY test_table FROM 's3://test_bucket/test.txt' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER ',';" diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index ed585f2..ae2a410 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -11,6 +11,7 @@ import ( "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" + l4g "github.com/ezoic/log4go" _ "github.com/lib/pq" ) @@ -119,7 +120,7 @@ func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileN content := e.generateManifestFile(files) err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{}) if err != nil { - fmt.Printf("Error occured while uploding to S3: %v\n", err) + l4g.Error("Error occured while uploding to S3: %v", err) } } diff --git a/s3_emitter.go b/s3_emitter.go index e141c54..d4f1034 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -7,6 +7,7 @@ import ( "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" + l4g "github.com/ezoic/log4go" ) // S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3. @@ -18,13 +19,18 @@ import ( type S3Emitter struct { Logger Logger S3Bucket string + S3Prefix string } // S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current // UTC date (YYYY-MM-DD) is base of the path to logically group days of batches. func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { date := time.Now().UTC().Format("2006/01/02") - return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) + if e.S3Prefix == "" { + return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) + } else { + return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq) + } } // Emit is invoked when the buffer is full. This method emits the set of filtered records. diff --git a/s3_emitter_test.go b/s3_emitter_test.go index 1307dd0..5c8e9b2 100644 --- a/s3_emitter_test.go +++ b/s3_emitter_test.go @@ -8,12 +8,22 @@ import ( func TestS3FileName(t *testing.T) { d := time.Now().UTC().Format("2006/01/02") - e := S3Emitter{} + e := S3Emitter{S3Bucket: "bucket", S3Prefix: "prefix"} - expected := fmt.Sprintf("%v/a-b", d) + expected := fmt.Sprintf("prefix/%v/a-b", d) result := e.S3FileName("a", "b") if result != expected { t.Errorf("S3FileName() = %v want %v", result, expected) } + + e.S3Prefix = "" + + expected = fmt.Sprintf("%v/a-b", d) + result = e.S3FileName("a", "b") + + if result != expected { + t.Errorf("S3FileName() = %v want %v", result, expected) + } + } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 59aa565..48f6fe4 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -1,7 +1,6 @@ package connector import ( - "github.com/sendgridlabs/go-kinesis" ) // An implementation of Emitter that puts event data on S3 file, and then puts the