diff --git a/.gitignore b/.gitignore index 9e98c21..2326b39 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ # Environment vars .env +# Seed data +users.txt + # Folders _obj _test diff --git a/awsbackoff.go b/awsbackoff.go index 010b0bc..72efd4e 100644 --- a/awsbackoff.go +++ b/awsbackoff.go @@ -16,26 +16,24 @@ type isRecoverableErrorFunc func(error) bool func kinesisIsRecoverableError(err error) bool { recoverableErrorCodes := map[string]bool{ - "ProvisionedThroughputExceededException": true, "InternalFailure": true, - "Throttling": true, + "ProvisionedThroughputExceededException": true, "ServiceUnavailable": true, + "Throttling": true, } - r := false cErr, ok := err.(*kinesis.Error) if ok && recoverableErrorCodes[cErr.Code] == true { - r = true + return true } - return r + return false } func urlIsRecoverableError(err error) bool { - r := false _, ok := err.(*url.Error) if ok { - r = true + return true } - return r + return false } func netIsRecoverableError(err error) bool { @@ -54,36 +52,32 @@ var redshiftRecoverableErrors = []*regexp.Regexp{ } func redshiftIsRecoverableError(err error) bool { - r := false if cErr, ok := err.(pq.Error); ok { for _, re := range redshiftRecoverableErrors { if re.MatchString(cErr.Message) { - r = true - break + return true } } } - return r + return false } var isRecoverableErrors = []isRecoverableErrorFunc{ - kinesisIsRecoverableError, netIsRecoverableError, urlIsRecoverableError, redshiftIsRecoverableError, + kinesisIsRecoverableError, + netIsRecoverableError, + redshiftIsRecoverableError, + urlIsRecoverableError, } // this determines whether the error is recoverable func isRecoverableError(err error) bool { - r := false - - logger.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err) - + logger.Log("info", "isRecoverableError", "type", reflect.TypeOf(err).String(), "msg", err.Error()) for _, errF := range isRecoverableErrors { - r = errF(err) - if r { - break + if errF(err) { + return true } } - - return r + return false } // handle the aws exponential backoff @@ -92,7 +86,7 @@ func isRecoverableError(err error) bool { func handleAwsWaitTimeExp(attempts int) { if attempts > 0 { waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond - logger.Printf("handleAwsWaitTimeExp: %s\n", waitTime.String()) + logger.Log("info", "handleAwsWaitTimeExp", "attempts", attempts, "waitTime", waitTime.String()) time.Sleep(waitTime) } } diff --git a/discard_logger.go b/discard_logger.go deleted file mode 100644 index 1f39f0d..0000000 --- a/discard_logger.go +++ /dev/null @@ -1,15 +0,0 @@ -package connector - -import "os" - -// DiscardLogger is the an implementation of a Logger that does not -// send any output. It can be used in scenarios when logging is not desired. -type DiscardLogger struct{} - -// Fatalf is equivalent to Printf() followed by a call to os.Exit(1). -func (l *DiscardLogger) Fatalf(format string, v ...interface{}) { - os.Exit(1) -} - -// Printf is noop and does not have any output. -func (l *DiscardLogger) Printf(format string, v ...interface{}) {} diff --git a/emitter.go b/emitter.go index 6cb8851..a28c591 100644 --- a/emitter.go +++ b/emitter.go @@ -7,5 +7,5 @@ package connector // Implementations may choose to fail the entire set of records in the buffer or to fail records // individually. type Emitter interface { - Emit(b Buffer, t Transformer) + Emit(b Buffer, t Transformer, shardID string) } diff --git a/examples/s3-pipeline/main.go b/examples/s3-pipeline/main.go index a6dbaa3..61557c0 100644 --- a/examples/s3-pipeline/main.go +++ b/examples/s3-pipeline/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" + "code.google.com/p/gcfg" "github.com/harlow/kinesis-connectors" "github.com/sendgridlabs/go-kinesis" ) diff --git a/examples/seed-stream/README.md b/examples/seed-stream/README.md index 07bf46d..f326f5b 100644 --- a/examples/seed-stream/README.md +++ b/examples/seed-stream/README.md @@ -16,4 +16,5 @@ export AWS_SECRET_KEY= ### Running the code + $ curl https://s3.amazonaws.com/kinesis.test/users.txt > users.txt $ go run main.go diff --git a/kinesis.go b/kinesis.go index bbd8c00..0505af3 100644 --- a/kinesis.go +++ b/kinesis.go @@ -13,7 +13,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { err := k.CreateStream(streamName, shardCount) if err != nil { - logger.Printf("CreateStream ERROR: %v", err) + logger.Log("error", "CreateStream", "msg", err.Error()) return } } @@ -26,7 +26,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { args.Add("StreamName", streamName) resp, _ = k.DescribeStream(args) streamStatus := resp.StreamDescription.StreamStatus - logger.Printf("Stream [%v] is %v", streamName, streamStatus) + logger.Log("info", "DescribeStream", "stream", streamName, "status", streamStatus) if streamStatus != "ACTIVE" { time.Sleep(4 * time.Second) @@ -42,7 +42,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool { args := kinesis.NewArgs() resp, err := k.ListStreams(args) if err != nil { - logger.Printf("ListStream ERROR: %v", err) + logger.Log("error", "ListStream", "stream", streamName, "status", err.Error()) return false } for _, s := range resp.StreamNames { diff --git a/logger.go b/logger.go index 9317c06..1159006 100644 --- a/logger.go +++ b/logger.go @@ -1,22 +1,22 @@ package connector import ( - "log" "os" + + "github.com/go-kit/kit/log" ) // Logger sends pipeline info and errors to logging endpoint. The logger could be // used to send to STDOUT, Syslog, or any number of distributed log collecting platforms. type Logger interface { - Fatalf(format string, v ...interface{}) - Printf(format string, v ...interface{}) + Log(keyvals ...interface{}) error } -// specify a default logger so that we don't end up with panics. -var logger Logger = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) - // SetLogger adds the ability to change the logger so that external packages // can control the logging for this package func SetLogger(l Logger) { logger = l } + +// specify a default logger so that we don't end up with panics. +var logger Logger = log.NewPrefixLogger(os.Stderr) diff --git a/pipeline.go b/pipeline.go index dbaede0..7b05d98 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,6 +1,7 @@ package connector import ( + "os" "time" "github.com/sendgridlabs/go-kinesis" @@ -38,7 +39,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardInfo, err := ksis.GetShardIterator(args) if err != nil { - logger.Fatalf("GetShardIterator ERROR: %v\n", err) + logger.Log("error", "GetShardIterator", "msg", err.Error()) + os.Exit(1) } shardIterator := shardInfo.ShardIterator @@ -46,7 +48,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { for { if consecutiveErrorAttempts > 50 { - logger.Fatalf("Too many consecutive error attempts") + logger.Log("error", "errorAttempts", "msg", "Too many consecutive error attempts") + os.Exit(1) } args = kinesis.NewArgs() @@ -55,12 +58,12 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if err != nil { if isRecoverableError(err) { - logger.Printf("GetRecords RECOVERABLE_ERROR: %v\n", err) consecutiveErrorAttempts++ handleAwsWaitTimeExp(consecutiveErrorAttempts) continue } else { - logger.Fatalf("GetRecords ERROR: %v\n", err) + logger.Log("error", "GetRecords", "msg", err.Error()) + os.Exit(1) } } else { consecutiveErrorAttempts = 0 @@ -71,7 +74,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { data := v.GetData() if err != nil { - logger.Printf("GetData ERROR: %v\n", err) + logger.Log("info", "GetData", "msg", err.Error()) continue } @@ -84,7 +87,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { - logger.Printf("NextShardIterator ERROR: %v\n", err) + logger.Log("error", "NextShardIterator", "msg", err.Error()) break } else { time.Sleep(5 * time.Second) @@ -92,7 +95,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if p.Buffer.ShouldFlush() { if p.Buffer.NumRecordsInBuffer() > 0 { - p.Emitter.Emit(p.Buffer, p.Transformer) + p.Emitter.Emit(p.Buffer, p.Transformer, shardID) } p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) p.Buffer.Flush() diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index ac9f4fd..82370a9 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -25,28 +25,29 @@ type RedshiftBasicEmitter struct { // Emit is invoked when the buffer is full. This method leverages the S3Emitter and // then issues a copy command to Redshift data store. -func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { +func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer, shardID string) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} - s3Emitter.Emit(b, t) + s3Emitter.Emit(b, t, shardID) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) - stmt := e.copyStatement(s3File) - for i := 0; i < 10; i++ { // execute copy statement - _, err := e.Db.Exec(stmt) + _, err := e.Db.Exec(e.copyStatement(s3File)) // if the request succeeded, or its an unrecoverable error, break out of loop - if err == nil || isRecoverableError(err) == false { + if err == nil { + logger.Log("info", "RedshiftBasicEmitter", "shard", shardID, "file", s3File) break } - // handle aws backoff, this may be necessary if, for example, the - // s3 file has not appeared to the database yet - handleAwsWaitTimeExp(i) + // handle recoverable errors + if isRecoverableError(err) { + handleAwsWaitTimeExp(i) + } else { + logger.Log("error", "RedshiftBasicEmitter", "shard", shardID, "msg", err.Error()) + break + } } - - logger.Printf("Redshift load completed.\n") } // Creates the SQL copy statement issued to Redshift cluster. diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index 07a6ef8..9dc7602 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -30,11 +30,12 @@ type RedshiftManifestEmitter struct { // Invoked when the buffer is full. // Emits a Manifest file to S3 and then performs the Redshift copy command. -func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { +func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer, shardID string) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - logger.Fatalf("sql.Open ERROR: %v\n", err) + logger.Log("error", "sql.Open", "msg", err.Error()) + os.Exit(1) } // Aggregate file paths as strings @@ -54,7 +55,8 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(c) if err != nil { - logger.Fatalf("db.Exec ERROR: %v\n", err) + logger.Log("error", "db.Exec", "msg", err.Error()) + os.Exit(1) } // Insert file paths into File Names table @@ -62,10 +64,11 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(i) if err != nil { - logger.Fatalf("db.Exec ERROR: %v\n", err) + logger.Log("error", "db.Exec", "shard", shardID, "msg", err.Error()) + os.Exit(1) } - logger.Printf("[%v] copied to Redshift", manifestFileName) + logger.Log("info", "Redshfit COPY", "shard", shardID, "manifest", manifestFileName) db.Close() } @@ -118,7 +121,7 @@ func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileN content := e.generateManifestFile(files) err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{}) if err != nil { - logger.Printf("Error occured while uploding to S3: %v", err) + logger.Log("error", "writeManifestToS3", "msg", err.Error()) } } diff --git a/s3_emitter.go b/s3_emitter.go index 4cf91fa..c6dde5c 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -3,6 +3,7 @@ package connector import ( "bytes" "fmt" + "os" "time" "github.com/crowdmob/goamz/aws" @@ -32,7 +33,7 @@ func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { } // Emit is invoked when the buffer is full. This method emits the set of filtered records. -func (e S3Emitter) Emit(b Buffer, t Transformer) { +func (e S3Emitter) Emit(b Buffer, t Transformer, shardID string) { auth, _ := aws.EnvAuth() s3Con := s3.New(auth, aws.USEast) bucket := s3Con.Bucket(e.S3Bucket) @@ -48,8 +49,9 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) { err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) if err != nil { - logger.Fatalf("S3Put ERROR: %v\n", err.Error()) + logger.Log("error", "S3Put", "shard", shardID, "msg", err.Error()) + os.Exit(1) } else { - logger.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket) + logger.Log("info", "S3Emitter", "shard", shardID, "bucket", e.S3Bucket, "numRecords", b.NumRecordsInBuffer()) } } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 88b3d06..00a1b0b 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -1,6 +1,8 @@ package connector import ( + "os" + "github.com/sendgridlabs/go-kinesis" ) @@ -12,11 +14,11 @@ type S3ManifestEmitter struct { Ksis *kinesis.Kinesis } -func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { +func (e S3ManifestEmitter) Emit(b Buffer, t Transformer, shardID string) { // Emit buffer contents to S3 Bucket s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} - s3Emitter.Emit(b, t) + s3Emitter.Emit(b, t, shardID) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) // Emit the file path to Kinesis Output stream @@ -28,8 +30,9 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { _, err := e.Ksis.PutRecord(args) if err != nil { - logger.Printf("PutRecord ERROR: %v", err) + logger.Log("error", "PutRecord", "msg", err) + os.Exit(1) } else { - logger.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) + logger.Log("info", "S3ManifestEmitter", "shard", shardID, "firstSequenceNumber", b.FirstSequenceNumber(), "stream", e.OutputStream) } }