Use go-kit Log interface
This commit is contained in:
parent
9371fb938c
commit
cd54569c61
13 changed files with 75 additions and 79 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -6,6 +6,9 @@
|
||||||
# Environment vars
|
# Environment vars
|
||||||
.env
|
.env
|
||||||
|
|
||||||
|
# Seed data
|
||||||
|
users.txt
|
||||||
|
|
||||||
# Folders
|
# Folders
|
||||||
_obj
|
_obj
|
||||||
_test
|
_test
|
||||||
|
|
|
||||||
|
|
@ -16,26 +16,24 @@ type isRecoverableErrorFunc func(error) bool
|
||||||
|
|
||||||
func kinesisIsRecoverableError(err error) bool {
|
func kinesisIsRecoverableError(err error) bool {
|
||||||
recoverableErrorCodes := map[string]bool{
|
recoverableErrorCodes := map[string]bool{
|
||||||
"ProvisionedThroughputExceededException": true,
|
|
||||||
"InternalFailure": true,
|
"InternalFailure": true,
|
||||||
"Throttling": true,
|
"ProvisionedThroughputExceededException": true,
|
||||||
"ServiceUnavailable": true,
|
"ServiceUnavailable": true,
|
||||||
|
"Throttling": true,
|
||||||
}
|
}
|
||||||
r := false
|
|
||||||
cErr, ok := err.(*kinesis.Error)
|
cErr, ok := err.(*kinesis.Error)
|
||||||
if ok && recoverableErrorCodes[cErr.Code] == true {
|
if ok && recoverableErrorCodes[cErr.Code] == true {
|
||||||
r = true
|
return true
|
||||||
}
|
}
|
||||||
return r
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func urlIsRecoverableError(err error) bool {
|
func urlIsRecoverableError(err error) bool {
|
||||||
r := false
|
|
||||||
_, ok := err.(*url.Error)
|
_, ok := err.(*url.Error)
|
||||||
if ok {
|
if ok {
|
||||||
r = true
|
return true
|
||||||
}
|
}
|
||||||
return r
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func netIsRecoverableError(err error) bool {
|
func netIsRecoverableError(err error) bool {
|
||||||
|
|
@ -54,36 +52,32 @@ var redshiftRecoverableErrors = []*regexp.Regexp{
|
||||||
}
|
}
|
||||||
|
|
||||||
func redshiftIsRecoverableError(err error) bool {
|
func redshiftIsRecoverableError(err error) bool {
|
||||||
r := false
|
|
||||||
if cErr, ok := err.(pq.Error); ok {
|
if cErr, ok := err.(pq.Error); ok {
|
||||||
for _, re := range redshiftRecoverableErrors {
|
for _, re := range redshiftRecoverableErrors {
|
||||||
if re.MatchString(cErr.Message) {
|
if re.MatchString(cErr.Message) {
|
||||||
r = true
|
return true
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return r
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
var isRecoverableErrors = []isRecoverableErrorFunc{
|
var isRecoverableErrors = []isRecoverableErrorFunc{
|
||||||
kinesisIsRecoverableError, netIsRecoverableError, urlIsRecoverableError, redshiftIsRecoverableError,
|
kinesisIsRecoverableError,
|
||||||
|
netIsRecoverableError,
|
||||||
|
redshiftIsRecoverableError,
|
||||||
|
urlIsRecoverableError,
|
||||||
}
|
}
|
||||||
|
|
||||||
// this determines whether the error is recoverable
|
// this determines whether the error is recoverable
|
||||||
func isRecoverableError(err error) bool {
|
func isRecoverableError(err error) bool {
|
||||||
r := false
|
logger.Log("info", "isRecoverableError", "type", reflect.TypeOf(err).String(), "msg", err.Error())
|
||||||
|
|
||||||
logger.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err)
|
|
||||||
|
|
||||||
for _, errF := range isRecoverableErrors {
|
for _, errF := range isRecoverableErrors {
|
||||||
r = errF(err)
|
if errF(err) {
|
||||||
if r {
|
return true
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return false
|
||||||
return r
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle the aws exponential backoff
|
// handle the aws exponential backoff
|
||||||
|
|
@ -92,7 +86,7 @@ func isRecoverableError(err error) bool {
|
||||||
func handleAwsWaitTimeExp(attempts int) {
|
func handleAwsWaitTimeExp(attempts int) {
|
||||||
if attempts > 0 {
|
if attempts > 0 {
|
||||||
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
|
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
|
||||||
logger.Printf("handleAwsWaitTimeExp: %s\n", waitTime.String())
|
logger.Log("info", "handleAwsWaitTimeExp", "attempts", attempts, "waitTime", waitTime.String())
|
||||||
time.Sleep(waitTime)
|
time.Sleep(waitTime)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,15 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
import "os"
|
|
||||||
|
|
||||||
// DiscardLogger is the an implementation of a Logger that does not
|
|
||||||
// send any output. It can be used in scenarios when logging is not desired.
|
|
||||||
type DiscardLogger struct{}
|
|
||||||
|
|
||||||
// Fatalf is equivalent to Printf() followed by a call to os.Exit(1).
|
|
||||||
func (l *DiscardLogger) Fatalf(format string, v ...interface{}) {
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Printf is noop and does not have any output.
|
|
||||||
func (l *DiscardLogger) Printf(format string, v ...interface{}) {}
|
|
||||||
|
|
@ -7,5 +7,5 @@ package connector
|
||||||
// Implementations may choose to fail the entire set of records in the buffer or to fail records
|
// Implementations may choose to fail the entire set of records in the buffer or to fail records
|
||||||
// individually.
|
// individually.
|
||||||
type Emitter interface {
|
type Emitter interface {
|
||||||
Emit(b Buffer, t Transformer)
|
Emit(b Buffer, t Transformer, shardID string)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"code.google.com/p/gcfg"
|
||||||
"github.com/harlow/kinesis-connectors"
|
"github.com/harlow/kinesis-connectors"
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -16,4 +16,5 @@ export AWS_SECRET_KEY=
|
||||||
|
|
||||||
### Running the code
|
### Running the code
|
||||||
|
|
||||||
|
$ curl https://s3.amazonaws.com/kinesis.test/users.txt > users.txt
|
||||||
$ go run main.go
|
$ go run main.go
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
err := k.CreateStream(streamName, shardCount)
|
err := k.CreateStream(streamName, shardCount)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Printf("CreateStream ERROR: %v", err)
|
logger.Log("error", "CreateStream", "msg", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -26,7 +26,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
args.Add("StreamName", streamName)
|
args.Add("StreamName", streamName)
|
||||||
resp, _ = k.DescribeStream(args)
|
resp, _ = k.DescribeStream(args)
|
||||||
streamStatus := resp.StreamDescription.StreamStatus
|
streamStatus := resp.StreamDescription.StreamStatus
|
||||||
logger.Printf("Stream [%v] is %v", streamName, streamStatus)
|
logger.Log("info", "DescribeStream", "stream", streamName, "status", streamStatus)
|
||||||
|
|
||||||
if streamStatus != "ACTIVE" {
|
if streamStatus != "ACTIVE" {
|
||||||
time.Sleep(4 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
|
|
@ -42,7 +42,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool {
|
||||||
args := kinesis.NewArgs()
|
args := kinesis.NewArgs()
|
||||||
resp, err := k.ListStreams(args)
|
resp, err := k.ListStreams(args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Printf("ListStream ERROR: %v", err)
|
logger.Log("error", "ListStream", "stream", streamName, "status", err.Error())
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for _, s := range resp.StreamNames {
|
for _, s := range resp.StreamNames {
|
||||||
|
|
|
||||||
12
logger.go
12
logger.go
|
|
@ -1,22 +1,22 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Logger sends pipeline info and errors to logging endpoint. The logger could be
|
// Logger sends pipeline info and errors to logging endpoint. The logger could be
|
||||||
// used to send to STDOUT, Syslog, or any number of distributed log collecting platforms.
|
// used to send to STDOUT, Syslog, or any number of distributed log collecting platforms.
|
||||||
type Logger interface {
|
type Logger interface {
|
||||||
Fatalf(format string, v ...interface{})
|
Log(keyvals ...interface{}) error
|
||||||
Printf(format string, v ...interface{})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// specify a default logger so that we don't end up with panics.
|
|
||||||
var logger Logger = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile)
|
|
||||||
|
|
||||||
// SetLogger adds the ability to change the logger so that external packages
|
// SetLogger adds the ability to change the logger so that external packages
|
||||||
// can control the logging for this package
|
// can control the logging for this package
|
||||||
func SetLogger(l Logger) {
|
func SetLogger(l Logger) {
|
||||||
logger = l
|
logger = l
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// specify a default logger so that we don't end up with panics.
|
||||||
|
var logger Logger = log.NewPrefixLogger(os.Stderr)
|
||||||
|
|
|
||||||
17
pipeline.go
17
pipeline.go
|
|
@ -1,6 +1,7 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
|
|
@ -38,7 +39,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
shardInfo, err := ksis.GetShardIterator(args)
|
shardInfo, err := ksis.GetShardIterator(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("GetShardIterator ERROR: %v\n", err)
|
logger.Log("error", "GetShardIterator", "msg", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIterator := shardInfo.ShardIterator
|
shardIterator := shardInfo.ShardIterator
|
||||||
|
|
@ -46,7 +48,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if consecutiveErrorAttempts > 50 {
|
if consecutiveErrorAttempts > 50 {
|
||||||
logger.Fatalf("Too many consecutive error attempts")
|
logger.Log("error", "errorAttempts", "msg", "Too many consecutive error attempts")
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
args = kinesis.NewArgs()
|
args = kinesis.NewArgs()
|
||||||
|
|
@ -55,12 +58,12 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isRecoverableError(err) {
|
if isRecoverableError(err) {
|
||||||
logger.Printf("GetRecords RECOVERABLE_ERROR: %v\n", err)
|
|
||||||
consecutiveErrorAttempts++
|
consecutiveErrorAttempts++
|
||||||
handleAwsWaitTimeExp(consecutiveErrorAttempts)
|
handleAwsWaitTimeExp(consecutiveErrorAttempts)
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
logger.Fatalf("GetRecords ERROR: %v\n", err)
|
logger.Log("error", "GetRecords", "msg", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
consecutiveErrorAttempts = 0
|
consecutiveErrorAttempts = 0
|
||||||
|
|
@ -71,7 +74,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
data := v.GetData()
|
data := v.GetData()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Printf("GetData ERROR: %v\n", err)
|
logger.Log("info", "GetData", "msg", err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -84,7 +87,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
||||||
logger.Printf("NextShardIterator ERROR: %v\n", err)
|
logger.Log("error", "NextShardIterator", "msg", err.Error())
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
@ -92,7 +95,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
|
|
||||||
if p.Buffer.ShouldFlush() {
|
if p.Buffer.ShouldFlush() {
|
||||||
if p.Buffer.NumRecordsInBuffer() > 0 {
|
if p.Buffer.NumRecordsInBuffer() > 0 {
|
||||||
p.Emitter.Emit(p.Buffer, p.Transformer)
|
p.Emitter.Emit(p.Buffer, p.Transformer, shardID)
|
||||||
}
|
}
|
||||||
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
|
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
|
||||||
p.Buffer.Flush()
|
p.Buffer.Flush()
|
||||||
|
|
|
||||||
|
|
@ -25,28 +25,29 @@ type RedshiftBasicEmitter struct {
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
||||||
// then issues a copy command to Redshift data store.
|
// then issues a copy command to Redshift data store.
|
||||||
func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer, shardID string) {
|
||||||
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
||||||
s3Emitter.Emit(b, t)
|
s3Emitter.Emit(b, t, shardID)
|
||||||
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
||||||
|
|
||||||
stmt := e.copyStatement(s3File)
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
// execute copy statement
|
// execute copy statement
|
||||||
_, err := e.Db.Exec(stmt)
|
_, err := e.Db.Exec(e.copyStatement(s3File))
|
||||||
|
|
||||||
// if the request succeeded, or its an unrecoverable error, break out of loop
|
// if the request succeeded, or its an unrecoverable error, break out of loop
|
||||||
if err == nil || isRecoverableError(err) == false {
|
if err == nil {
|
||||||
|
logger.Log("info", "RedshiftBasicEmitter", "shard", shardID, "file", s3File)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle aws backoff, this may be necessary if, for example, the
|
// handle recoverable errors
|
||||||
// s3 file has not appeared to the database yet
|
if isRecoverableError(err) {
|
||||||
handleAwsWaitTimeExp(i)
|
handleAwsWaitTimeExp(i)
|
||||||
|
} else {
|
||||||
|
logger.Log("error", "RedshiftBasicEmitter", "shard", shardID, "msg", err.Error())
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Printf("Redshift load completed.\n")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates the SQL copy statement issued to Redshift cluster.
|
// Creates the SQL copy statement issued to Redshift cluster.
|
||||||
|
|
|
||||||
|
|
@ -30,11 +30,12 @@ type RedshiftManifestEmitter struct {
|
||||||
|
|
||||||
// Invoked when the buffer is full.
|
// Invoked when the buffer is full.
|
||||||
// Emits a Manifest file to S3 and then performs the Redshift copy command.
|
// Emits a Manifest file to S3 and then performs the Redshift copy command.
|
||||||
func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer, shardID string) {
|
||||||
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("sql.Open ERROR: %v\n", err)
|
logger.Log("error", "sql.Open", "msg", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aggregate file paths as strings
|
// Aggregate file paths as strings
|
||||||
|
|
@ -54,7 +55,8 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err = db.Exec(c)
|
_, err = db.Exec(c)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("db.Exec ERROR: %v\n", err)
|
logger.Log("error", "db.Exec", "msg", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert file paths into File Names table
|
// Insert file paths into File Names table
|
||||||
|
|
@ -62,10 +64,11 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err = db.Exec(i)
|
_, err = db.Exec(i)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("db.Exec ERROR: %v\n", err)
|
logger.Log("error", "db.Exec", "shard", shardID, "msg", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Printf("[%v] copied to Redshift", manifestFileName)
|
logger.Log("info", "Redshfit COPY", "shard", shardID, "manifest", manifestFileName)
|
||||||
db.Close()
|
db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -118,7 +121,7 @@ func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileN
|
||||||
content := e.generateManifestFile(files)
|
content := e.generateManifestFile(files)
|
||||||
err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{})
|
err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Printf("Error occured while uploding to S3: %v", err)
|
logger.Log("error", "writeManifestToS3", "msg", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package connector
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
|
@ -32,7 +33,7 @@ func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
||||||
func (e S3Emitter) Emit(b Buffer, t Transformer) {
|
func (e S3Emitter) Emit(b Buffer, t Transformer, shardID string) {
|
||||||
auth, _ := aws.EnvAuth()
|
auth, _ := aws.EnvAuth()
|
||||||
s3Con := s3.New(auth, aws.USEast)
|
s3Con := s3.New(auth, aws.USEast)
|
||||||
bucket := s3Con.Bucket(e.S3Bucket)
|
bucket := s3Con.Bucket(e.S3Bucket)
|
||||||
|
|
@ -48,8 +49,9 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) {
|
||||||
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("S3Put ERROR: %v\n", err.Error())
|
logger.Log("error", "S3Put", "shard", shardID, "msg", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
} else {
|
} else {
|
||||||
logger.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket)
|
logger.Log("info", "S3Emitter", "shard", shardID, "bucket", e.S3Bucket, "numRecords", b.NumRecordsInBuffer())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -12,11 +14,11 @@ type S3ManifestEmitter struct {
|
||||||
Ksis *kinesis.Kinesis
|
Ksis *kinesis.Kinesis
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer, shardID string) {
|
||||||
|
|
||||||
// Emit buffer contents to S3 Bucket
|
// Emit buffer contents to S3 Bucket
|
||||||
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
||||||
s3Emitter.Emit(b, t)
|
s3Emitter.Emit(b, t, shardID)
|
||||||
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
||||||
|
|
||||||
// Emit the file path to Kinesis Output stream
|
// Emit the file path to Kinesis Output stream
|
||||||
|
|
@ -28,8 +30,9 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err := e.Ksis.PutRecord(args)
|
_, err := e.Ksis.PutRecord(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Printf("PutRecord ERROR: %v", err)
|
logger.Log("error", "PutRecord", "msg", err)
|
||||||
|
os.Exit(1)
|
||||||
} else {
|
} else {
|
||||||
logger.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream)
|
logger.Log("info", "S3ManifestEmitter", "shard", shardID, "firstSequenceNumber", b.FirstSequenceNumber(), "stream", e.OutputStream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue