diff --git a/README.md b/README.md index c461443..fdde0ca 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,6 @@ func newS3Pipeline(cfg Config) *connector.Pipeline { Checkpoint: c, Emitter: e, Filter: f, - Logger: log.New(os.Stdout, "KINESIS: ", log.Ldate|log.Ltime|log.Lshortfile), StreamName: cfg.Kinesis.StreamName, Transformer: t, } diff --git a/awsbackoff.go b/awsbackoff.go index 31fe87b..e7283d7 100644 --- a/awsbackoff.go +++ b/awsbackoff.go @@ -9,9 +9,8 @@ import ( "regexp" "time" - "github.com/ezoic/go-kinesis" - l4g "github.com/ezoic/log4go" "github.com/lib/pq" + "github.com/sendgridlabs/go-kinesis" ) type isRecoverableErrorFunc func(error) bool @@ -97,7 +96,7 @@ func handleAwsWaitTimeExp(attempts int) { // wait up to 5 minutes based on the aws exponential backoff algorithm if attempts > 0 { waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond - l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String()) + logger.Printf("handleAwsWaitTimeExp:%s", waitTime.String()) time.Sleep(waitTime) } diff --git a/awsbackoff_test.go b/awsbackoff_test.go index d2c7cbd..7925330 100644 --- a/awsbackoff_test.go +++ b/awsbackoff_test.go @@ -5,8 +5,8 @@ import ( "net" "testing" - "github.com/ezoic/go-kinesis" "github.com/lib/pq" + "github.com/sendgridlabs/go-kinesis" ) func Test_isRecoverableError(t *testing.T) { diff --git a/kinesis.go b/kinesis.go index c83d466..bbd8c00 100644 --- a/kinesis.go +++ b/kinesis.go @@ -3,8 +3,7 @@ package connector import ( "time" - "github.com/ezoic/go-kinesis" - l4g "github.com/ezoic/log4go" + "github.com/sendgridlabs/go-kinesis" ) // CreateStream creates a new Kinesis stream (uses existing stream if exists) and @@ -14,7 +13,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { err := k.CreateStream(streamName, shardCount) if err != nil { - l4g.Error("CreateStream ERROR: %v", err) + logger.Printf("CreateStream ERROR: %v", err) return } } @@ -27,7 +26,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { args.Add("StreamName", streamName) resp, _ = k.DescribeStream(args) streamStatus := resp.StreamDescription.StreamStatus - l4g.Info("Stream [%v] is %v", streamName, streamStatus) + logger.Printf("Stream [%v] is %v", streamName, streamStatus) if streamStatus != "ACTIVE" { time.Sleep(4 * time.Second) @@ -43,7 +42,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool { args := kinesis.NewArgs() resp, err := k.ListStreams(args) if err != nil { - l4g.Error("ListStream ERROR: %v", err) + logger.Printf("ListStream ERROR: %v", err) return false } for _, s := range resp.StreamNames { diff --git a/mysql_checkpoint.go b/mysql_checkpoint.go index 4bac5ba..36ded2c 100644 --- a/mysql_checkpoint.go +++ b/mysql_checkpoint.go @@ -4,7 +4,6 @@ import ( "database/sql" "fmt" - l4g "github.com/ezoic/log4go" _ "github.com/go-sql-driver/mysql" ) @@ -24,13 +23,13 @@ type MysqlCheckpoint struct { // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). func (c *MysqlCheckpoint) CheckpointExists(shardID string) bool { - l4g.Finest("SELECT sequence_number FROM " + c.TableName + " WHERE checkpoint_key = ?") + logger.Printf("SELECT sequence_number FROM " + c.TableName + " WHERE checkpoint_key = ?") row := c.Db.QueryRow("SELECT sequence_number FROM "+c.TableName+" WHERE checkpoint_key = ?", c.key(shardID)) var val string err := row.Scan(&val) if err == nil { - l4g.Finest("sequence:%s", val) + logger.Printf("sequence:%s", val) c.sequenceNumber = val return true } diff --git a/pipeline.go b/pipeline.go index 4a64c87..241f224 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,10 +1,10 @@ package connector import ( + "log" "time" - "github.com/ezoic/go-kinesis" - l4g "github.com/ezoic/log4go" + "github.com/sendgridlabs/go-kinesis" ) // Pipeline is used as a record processor to configure a pipline. @@ -61,11 +61,11 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if err != nil { if isRecoverableError(err) { - p.Logger.Infof("recoverable error, %s", err) + logger.Printf("recoverable error, %s", err) consecutiveErrorAttempts++ continue } else { - p.Logger.Fatalf("GetRecords ERROR: %v\n", err) + logger.Fatalf("GetRecords ERROR: %v\n", err) } } else { consecutiveErrorAttempts = 0 diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 215d2dc..f5a07c4 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -7,7 +7,7 @@ import ( "os" // Postgres package is used when sql.Open is called - l4g "github.com/ezoic/log4go" + _ "github.com/lib/pq" ) @@ -50,7 +50,7 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { } // recoverable error, lets warn - l4g.Warn(err) + logger.Printf("%v", err) } @@ -77,6 +77,6 @@ func (e RedshiftBasicEmitter) copyStatement(s3File string) string { b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) } b.WriteString(";") - l4g.Debug(b.String()) + logger.Printf(b.String()) return b.String() } diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index 309af15..07a6ef8 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -11,7 +11,6 @@ import ( "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" - l4g "github.com/ezoic/log4go" _ "github.com/lib/pq" ) @@ -119,7 +118,7 @@ func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileN content := e.generateManifestFile(files) err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{}) if err != nil { - l4g.Error("Error occured while uploding to S3: %v", err) + logger.Printf("Error occured while uploding to S3: %v", err) } } diff --git a/s3_emitter.go b/s3_emitter.go index 8ae99a6..4cf91fa 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -7,7 +7,6 @@ import ( "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" - l4g "github.com/ezoic/log4go" ) // S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3. diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index adf6815..88b3d06 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -1,6 +1,7 @@ package connector import ( + "github.com/sendgridlabs/go-kinesis" ) // An implementation of Emitter that puts event data on S3 file, and then puts the