replace the logger and ezoic/go-kinesis
This commit is contained in:
parent
05330c6a9d
commit
ebcea0a451
10 changed files with 18 additions and 23 deletions
|
|
@ -83,7 +83,6 @@ func newS3Pipeline(cfg Config) *connector.Pipeline {
|
||||||
Checkpoint: c,
|
Checkpoint: c,
|
||||||
Emitter: e,
|
Emitter: e,
|
||||||
Filter: f,
|
Filter: f,
|
||||||
Logger: log.New(os.Stdout, "KINESIS: ", log.Ldate|log.Ltime|log.Lshortfile),
|
|
||||||
StreamName: cfg.Kinesis.StreamName,
|
StreamName: cfg.Kinesis.StreamName,
|
||||||
Transformer: t,
|
Transformer: t,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,9 +9,8 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ezoic/go-kinesis"
|
|
||||||
l4g "github.com/ezoic/log4go"
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
||||||
type isRecoverableErrorFunc func(error) bool
|
type isRecoverableErrorFunc func(error) bool
|
||||||
|
|
@ -97,7 +96,7 @@ func handleAwsWaitTimeExp(attempts int) {
|
||||||
// wait up to 5 minutes based on the aws exponential backoff algorithm
|
// wait up to 5 minutes based on the aws exponential backoff algorithm
|
||||||
if attempts > 0 {
|
if attempts > 0 {
|
||||||
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
|
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
|
||||||
l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String())
|
logger.Printf("handleAwsWaitTimeExp:%s", waitTime.String())
|
||||||
time.Sleep(waitTime)
|
time.Sleep(waitTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,8 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ezoic/go-kinesis"
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_isRecoverableError(t *testing.T) {
|
func Test_isRecoverableError(t *testing.T) {
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,7 @@ package connector
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ezoic/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
l4g "github.com/ezoic/log4go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// CreateStream creates a new Kinesis stream (uses existing stream if exists) and
|
// CreateStream creates a new Kinesis stream (uses existing stream if exists) and
|
||||||
|
|
@ -14,7 +13,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
err := k.CreateStream(streamName, shardCount)
|
err := k.CreateStream(streamName, shardCount)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l4g.Error("CreateStream ERROR: %v", err)
|
logger.Printf("CreateStream ERROR: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -27,7 +26,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
args.Add("StreamName", streamName)
|
args.Add("StreamName", streamName)
|
||||||
resp, _ = k.DescribeStream(args)
|
resp, _ = k.DescribeStream(args)
|
||||||
streamStatus := resp.StreamDescription.StreamStatus
|
streamStatus := resp.StreamDescription.StreamStatus
|
||||||
l4g.Info("Stream [%v] is %v", streamName, streamStatus)
|
logger.Printf("Stream [%v] is %v", streamName, streamStatus)
|
||||||
|
|
||||||
if streamStatus != "ACTIVE" {
|
if streamStatus != "ACTIVE" {
|
||||||
time.Sleep(4 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
|
|
@ -43,7 +42,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool {
|
||||||
args := kinesis.NewArgs()
|
args := kinesis.NewArgs()
|
||||||
resp, err := k.ListStreams(args)
|
resp, err := k.ListStreams(args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l4g.Error("ListStream ERROR: %v", err)
|
logger.Printf("ListStream ERROR: %v", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for _, s := range resp.StreamNames {
|
for _, s := range resp.StreamNames {
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
l4g "github.com/ezoic/log4go"
|
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -24,13 +23,13 @@ type MysqlCheckpoint struct {
|
||||||
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
||||||
func (c *MysqlCheckpoint) CheckpointExists(shardID string) bool {
|
func (c *MysqlCheckpoint) CheckpointExists(shardID string) bool {
|
||||||
|
|
||||||
l4g.Finest("SELECT sequence_number FROM " + c.TableName + " WHERE checkpoint_key = ?")
|
logger.Printf("SELECT sequence_number FROM " + c.TableName + " WHERE checkpoint_key = ?")
|
||||||
|
|
||||||
row := c.Db.QueryRow("SELECT sequence_number FROM "+c.TableName+" WHERE checkpoint_key = ?", c.key(shardID))
|
row := c.Db.QueryRow("SELECT sequence_number FROM "+c.TableName+" WHERE checkpoint_key = ?", c.key(shardID))
|
||||||
var val string
|
var val string
|
||||||
err := row.Scan(&val)
|
err := row.Scan(&val)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
l4g.Finest("sequence:%s", val)
|
logger.Printf("sequence:%s", val)
|
||||||
c.sequenceNumber = val
|
c.sequenceNumber = val
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ezoic/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
l4g "github.com/ezoic/log4go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Pipeline is used as a record processor to configure a pipline.
|
// Pipeline is used as a record processor to configure a pipline.
|
||||||
|
|
@ -61,11 +61,11 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isRecoverableError(err) {
|
if isRecoverableError(err) {
|
||||||
p.Logger.Infof("recoverable error, %s", err)
|
logger.Printf("recoverable error, %s", err)
|
||||||
consecutiveErrorAttempts++
|
consecutiveErrorAttempts++
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
p.Logger.Fatalf("GetRecords ERROR: %v\n", err)
|
logger.Fatalf("GetRecords ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
consecutiveErrorAttempts = 0
|
consecutiveErrorAttempts = 0
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
// Postgres package is used when sql.Open is called
|
// Postgres package is used when sql.Open is called
|
||||||
l4g "github.com/ezoic/log4go"
|
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -50,7 +50,7 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// recoverable error, lets warn
|
// recoverable error, lets warn
|
||||||
l4g.Warn(err)
|
logger.Printf("%v", err)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -77,6 +77,6 @@ func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
|
||||||
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
|
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
|
||||||
}
|
}
|
||||||
b.WriteString(";")
|
b.WriteString(";")
|
||||||
l4g.Debug(b.String())
|
logger.Printf(b.String())
|
||||||
return b.String()
|
return b.String()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,6 @@ import (
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
"github.com/crowdmob/goamz/s3"
|
"github.com/crowdmob/goamz/s3"
|
||||||
l4g "github.com/ezoic/log4go"
|
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -119,7 +118,7 @@ func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileN
|
||||||
content := e.generateManifestFile(files)
|
content := e.generateManifestFile(files)
|
||||||
err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{})
|
err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l4g.Error("Error occured while uploding to S3: %v", err)
|
logger.Printf("Error occured while uploding to S3: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ import (
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
"github.com/crowdmob/goamz/s3"
|
"github.com/crowdmob/goamz/s3"
|
||||||
l4g "github.com/ezoic/log4go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3.
|
// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3.
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
||||||
// An implementation of Emitter that puts event data on S3 file, and then puts the
|
// An implementation of Emitter that puts event data on S3 file, and then puts the
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue