Add Logger interface
To allow for different logging endpoints we'll introduce a Logger interface that will be passed into the pipeline during initialization. * Add Logger interface * Use logger interface in pipeline and emitters
This commit is contained in:
parent
5cf67a3c85
commit
cd71fd41bc
8 changed files with 34 additions and 25 deletions
|
|
@ -110,6 +110,7 @@ func main() {
|
||||||
Checkpoint: &c,
|
Checkpoint: &c,
|
||||||
Emitter: &e,
|
Emitter: &e,
|
||||||
Filter: &f,
|
Filter: &f,
|
||||||
|
Logger: log.New(os.Stdout, "KINESIS: ", log.Ldate|log.Ltime|log.Lshortfile),
|
||||||
StreamName: cfg.Kinesis.InputStream,
|
StreamName: cfg.Kinesis.InputStream,
|
||||||
Transformer: &t,
|
Transformer: &t,
|
||||||
}
|
}
|
||||||
|
|
@ -144,6 +145,7 @@ func main() {
|
||||||
Checkpoint: &c,
|
Checkpoint: &c,
|
||||||
Emitter: &e,
|
Emitter: &e,
|
||||||
Filter: &f,
|
Filter: &f,
|
||||||
|
Logger: log.New(os.Stdout, "KINESIS: ", log.Ldate|log.Ltime|log.Lshortfile),
|
||||||
StreamName: cfg.Kinesis.OutputStream,
|
StreamName: cfg.Kinesis.OutputStream,
|
||||||
Transformer: &t,
|
Transformer: &t,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
8
logger.go
Normal file
8
logger.go
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
// Logger sends pipeline info and errors to logging endpoint. The logger could be
|
||||||
|
// used to send to STDOUT, Syslog, or any number of distributed log collecting platforms.
|
||||||
|
type Logger interface {
|
||||||
|
Fatalf(format string, v ...interface{})
|
||||||
|
Printf(format string, v ...interface{})
|
||||||
|
}
|
||||||
10
pipeline.go
10
pipeline.go
|
|
@ -1,7 +1,6 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
|
|
@ -17,6 +16,7 @@ type Pipeline struct {
|
||||||
Checkpoint Checkpoint
|
Checkpoint Checkpoint
|
||||||
Emitter Emitter
|
Emitter Emitter
|
||||||
Filter Filter
|
Filter Filter
|
||||||
|
Logger Logger
|
||||||
StreamName string
|
StreamName string
|
||||||
Transformer Transformer
|
Transformer Transformer
|
||||||
}
|
}
|
||||||
|
|
@ -38,7 +38,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
shardInfo, err := ksis.GetShardIterator(args)
|
shardInfo, err := ksis.GetShardIterator(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("GetShardIterator ERROR: %v\n", err)
|
p.Logger.Fatalf("GetShardIterator ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIterator := shardInfo.ShardIterator
|
shardIterator := shardInfo.ShardIterator
|
||||||
|
|
@ -49,7 +49,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
recordSet, err := ksis.GetRecords(args)
|
recordSet, err := ksis.GetRecords(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("GetRecords ERROR: %v\n", err)
|
p.Logger.Fatalf("GetRecords ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(recordSet.Records) > 0 {
|
if len(recordSet.Records) > 0 {
|
||||||
|
|
@ -57,7 +57,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
data := v.GetData()
|
data := v.GetData()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("GetData ERROR: %v\n", err)
|
p.Logger.Printf("GetData ERROR: %v\n", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,7 +68,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
||||||
log.Printf("NextShardIterator ERROR: %v\n", err)
|
p.Logger.Printf("NextShardIterator ERROR: %v\n", err)
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
// Postgres package is used when sql.Open is called
|
// Postgres package is used when sql.Open is called
|
||||||
|
|
@ -14,38 +13,39 @@ import (
|
||||||
// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one.
|
// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one.
|
||||||
// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered
|
// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered
|
||||||
// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct.
|
// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct.
|
||||||
type RedshiftBasicEmtitter struct {
|
type RedshiftBasicEmitter struct {
|
||||||
Delimiter string
|
Delimiter string
|
||||||
Format string
|
Format string
|
||||||
Jsonpaths string
|
Jsonpaths string
|
||||||
|
Logger Logger
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
TableName string
|
TableName string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
||||||
// then issues a copy command to Redshift data store.
|
// then issues a copy command to Redshift data store.
|
||||||
func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) {
|
func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
||||||
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
||||||
s3Emitter.Emit(b, t)
|
s3Emitter.Emit(b, t)
|
||||||
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
||||||
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
e.Logger.Fatalf("sql.Open ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(e.copyStatement(s3File))
|
_, err = db.Exec(e.copyStatement(s3File))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
e.Logger.Fatalf("db.Exec ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("Redshift load completed.\n")
|
e.Logger.Printf("Redshift load completed.\n")
|
||||||
db.Close()
|
db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates the SQL copy statement issued to Redshift cluster.
|
// Creates the SQL copy statement issued to Redshift cluster.
|
||||||
func (e RedshiftBasicEmtitter) copyStatement(s3File string) string {
|
func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
|
||||||
b := new(bytes.Buffer)
|
b := new(bytes.Buffer)
|
||||||
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
||||||
b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File))
|
b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File))
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCopyStatement(t *testing.T) {
|
func TestCopyStatement(t *testing.T) {
|
||||||
e := RedshiftBasicEmtitter{
|
e := RedshiftBasicEmitter{
|
||||||
Delimiter: ",",
|
Delimiter: ",",
|
||||||
S3Bucket: "test_bucket",
|
S3Bucket: "test_bucket",
|
||||||
TableName: "test_table",
|
TableName: "test_table",
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -25,6 +24,7 @@ type RedshiftManifestEmitter struct {
|
||||||
FileTable string
|
FileTable string
|
||||||
Format string
|
Format string
|
||||||
Jsonpaths string
|
Jsonpaths string
|
||||||
|
Logger Logger
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
SecretKey string
|
SecretKey string
|
||||||
}
|
}
|
||||||
|
|
@ -35,7 +35,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
e.Logger.Fatalf("sql.Open ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aggregate file paths as strings
|
// Aggregate file paths as strings
|
||||||
|
|
@ -55,7 +55,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err = db.Exec(c)
|
_, err = db.Exec(c)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
e.Logger.Fatalf("db.Exec ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert file paths into File Names table
|
// Insert file paths into File Names table
|
||||||
|
|
@ -63,10 +63,10 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err = db.Exec(i)
|
_, err = db.Exec(i)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
e.Logger.Fatalf("db.Exec ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("[%v] copied to Redshift", manifestFileName)
|
e.Logger.Printf("[%v] copied to Redshift", manifestFileName)
|
||||||
db.Close()
|
db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@ package connector
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
|
@ -17,6 +16,7 @@ import (
|
||||||
// from the first and last sequence numbers of the records contained in that file separated by a
|
// from the first and last sequence numbers of the records contained in that file separated by a
|
||||||
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
||||||
type S3Emitter struct {
|
type S3Emitter struct {
|
||||||
|
Logger Logger
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -44,8 +44,8 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) {
|
||||||
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("S3Put ERROR: %v\n", err)
|
e.Logger.Fatalf("S3Put ERROR: %v\n", err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket)
|
e.Logger.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,6 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -12,6 +10,7 @@ type S3ManifestEmitter struct {
|
||||||
OutputStream string
|
OutputStream string
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
Ksis *kinesis.Kinesis
|
Ksis *kinesis.Kinesis
|
||||||
|
Logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
|
|
@ -30,8 +29,8 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err := e.Ksis.PutRecord(args)
|
_, err := e.Ksis.PutRecord(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("PutRecord ERROR: %v", err)
|
e.Logger.Printf("PutRecord ERROR: %v", err)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream)
|
e.Logger.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue