Merge branch 'hw-updates' into ez-recover-errors
Conflicts: pipeline.go redshift_basic_emitter.go
This commit is contained in:
commit
05330c6a9d
13 changed files with 362 additions and 29 deletions
104
awsbackoff.go
Normal file
104
awsbackoff.go
Normal file
|
|
@ -0,0 +1,104 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"reflect"
|
||||||
|
"regexp"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ezoic/go-kinesis"
|
||||||
|
l4g "github.com/ezoic/log4go"
|
||||||
|
"github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
type isRecoverableErrorFunc func(error) bool
|
||||||
|
|
||||||
|
func kinesisIsRecoverableError(err error) bool {
|
||||||
|
recoverableErrorCodes := map[string]bool{
|
||||||
|
"ProvisionedThroughputExceededException": true,
|
||||||
|
"InternalFailure": true,
|
||||||
|
"Throttling": true,
|
||||||
|
"ServiceUnavailable": true,
|
||||||
|
//"ExpiredIteratorException": true,
|
||||||
|
}
|
||||||
|
r := false
|
||||||
|
cErr, ok := err.(*kinesis.Error)
|
||||||
|
if ok && recoverableErrorCodes[cErr.Code] == true {
|
||||||
|
r = true
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func urlIsRecoverableError(err error) bool {
|
||||||
|
r := false
|
||||||
|
_, ok := err.(*url.Error)
|
||||||
|
if ok {
|
||||||
|
r = true
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func netIsRecoverableError(err error) bool {
|
||||||
|
recoverableErrors := map[string]bool{
|
||||||
|
"connection reset by peer": true,
|
||||||
|
}
|
||||||
|
r := false
|
||||||
|
cErr, ok := err.(*net.OpError)
|
||||||
|
if ok && recoverableErrors[cErr.Err.Error()] == true {
|
||||||
|
r = true
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
var redshiftRecoverableErrors = []*regexp.Regexp{
|
||||||
|
regexp.MustCompile("The specified S3 prefix '.*?' does not exist"),
|
||||||
|
}
|
||||||
|
|
||||||
|
func redshiftIsRecoverableError(err error) bool {
|
||||||
|
r := false
|
||||||
|
if cErr, ok := err.(pq.Error); ok {
|
||||||
|
for _, re := range redshiftRecoverableErrors {
|
||||||
|
if re.MatchString(cErr.Message) {
|
||||||
|
r = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
var isRecoverableErrors = []isRecoverableErrorFunc{
|
||||||
|
kinesisIsRecoverableError, netIsRecoverableError, urlIsRecoverableError, redshiftIsRecoverableError,
|
||||||
|
}
|
||||||
|
|
||||||
|
// this determines whether the error is recoverable
|
||||||
|
func isRecoverableError(err error) bool {
|
||||||
|
r := false
|
||||||
|
|
||||||
|
log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err)
|
||||||
|
|
||||||
|
for _, errF := range isRecoverableErrors {
|
||||||
|
r = errF(err)
|
||||||
|
if r {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle the aws exponential backoff
|
||||||
|
func handleAwsWaitTimeExp(attempts int) {
|
||||||
|
|
||||||
|
//http://docs.aws.amazon.com/general/latest/gr/api-retries.html
|
||||||
|
// wait up to 5 minutes based on the aws exponential backoff algorithm
|
||||||
|
if attempts > 0 {
|
||||||
|
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
|
||||||
|
l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String())
|
||||||
|
time.Sleep(waitTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
43
awsbackoff_test.go
Normal file
43
awsbackoff_test.go
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ezoic/go-kinesis"
|
||||||
|
"github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_isRecoverableError(t *testing.T) {
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
err error
|
||||||
|
isRecoverable bool
|
||||||
|
}{
|
||||||
|
{err: &kinesis.Error{Code: "ProvisionedThroughputExceededException"}, isRecoverable: true},
|
||||||
|
{err: &kinesis.Error{Code: "Throttling"}, isRecoverable: true},
|
||||||
|
{err: &kinesis.Error{Code: "ServiceUnavailable"}, isRecoverable: true},
|
||||||
|
{err: &kinesis.Error{Code: "ExpiredIteratorException"}, isRecoverable: false},
|
||||||
|
{err: &net.OpError{Err: fmt.Errorf("connection reset by peer")}, isRecoverable: true},
|
||||||
|
{err: &net.OpError{Err: fmt.Errorf("unexpected error")}, isRecoverable: false},
|
||||||
|
{err: fmt.Errorf("an arbitrary error"), isRecoverable: false},
|
||||||
|
{err: pq.Error{Message: "The specified S3 prefix 'somefilethatismissing' does not exist"}, isRecoverable: true},
|
||||||
|
{err: pq.Error{Message: "Some other pq error"}, isRecoverable: false},
|
||||||
|
|
||||||
|
//"InternalFailure": true,
|
||||||
|
//"Throttling": true,
|
||||||
|
//"ServiceUnavailable": true,
|
||||||
|
////"ExpiredIteratorException": true,
|
||||||
|
//{err: *kinesis.Error{Code:""}}
|
||||||
|
}
|
||||||
|
|
||||||
|
for idx, tc := range testCases {
|
||||||
|
|
||||||
|
isRecoverable := isRecoverableError(tc.err)
|
||||||
|
if isRecoverable != tc.isRecoverable {
|
||||||
|
t.Errorf("test case %d: isRecoverable expected %t, actual %t, for error %+v", idx, tc.isRecoverable, isRecoverable, tc.err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
10
kinesis.go
10
kinesis.go
|
|
@ -1,10 +1,10 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/ezoic/go-kinesis"
|
||||||
|
l4g "github.com/ezoic/log4go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CreateStream creates a new Kinesis stream (uses existing stream if exists) and
|
// CreateStream creates a new Kinesis stream (uses existing stream if exists) and
|
||||||
|
|
@ -14,7 +14,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
err := k.CreateStream(streamName, shardCount)
|
err := k.CreateStream(streamName, shardCount)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("CreateStream ERROR: %v\n", err)
|
l4g.Error("CreateStream ERROR: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -27,7 +27,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
args.Add("StreamName", streamName)
|
args.Add("StreamName", streamName)
|
||||||
resp, _ = k.DescribeStream(args)
|
resp, _ = k.DescribeStream(args)
|
||||||
streamStatus := resp.StreamDescription.StreamStatus
|
streamStatus := resp.StreamDescription.StreamStatus
|
||||||
fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus)
|
l4g.Info("Stream [%v] is %v", streamName, streamStatus)
|
||||||
|
|
||||||
if streamStatus != "ACTIVE" {
|
if streamStatus != "ACTIVE" {
|
||||||
time.Sleep(4 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
|
|
@ -43,7 +43,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool {
|
||||||
args := kinesis.NewArgs()
|
args := kinesis.NewArgs()
|
||||||
resp, err := k.ListStreams(args)
|
resp, err := k.ListStreams(args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("ListStream ERROR: %v\n", err)
|
l4g.Error("ListStream ERROR: %v", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for _, s := range resp.StreamNames {
|
for _, s := range resp.StreamNames {
|
||||||
|
|
|
||||||
66
mysql_checkpoint.go
Normal file
66
mysql_checkpoint.go
Normal file
|
|
@ -0,0 +1,66 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
l4g "github.com/ezoic/log4go"
|
||||||
|
_ "github.com/go-sql-driver/mysql"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MysqlCheckpoint implements the Checkpont interface.
|
||||||
|
// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress.
|
||||||
|
type MysqlCheckpoint struct {
|
||||||
|
AppName string
|
||||||
|
StreamName string
|
||||||
|
TableName string
|
||||||
|
Db *sql.DB
|
||||||
|
|
||||||
|
sequenceNumber string
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckpointExists determines if a checkpoint for a particular Shard exists.
|
||||||
|
// Typically used to determine whether we should start processing the shard with
|
||||||
|
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
||||||
|
func (c *MysqlCheckpoint) CheckpointExists(shardID string) bool {
|
||||||
|
|
||||||
|
l4g.Finest("SELECT sequence_number FROM " + c.TableName + " WHERE checkpoint_key = ?")
|
||||||
|
|
||||||
|
row := c.Db.QueryRow("SELECT sequence_number FROM "+c.TableName+" WHERE checkpoint_key = ?", c.key(shardID))
|
||||||
|
var val string
|
||||||
|
err := row.Scan(&val)
|
||||||
|
if err == nil {
|
||||||
|
l4g.Finest("sequence:%s", val)
|
||||||
|
c.sequenceNumber = val
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// something bad happened, better blow up the process
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SequenceNumber returns the current checkpoint stored for the specified shard.
|
||||||
|
func (c *MysqlCheckpoint) SequenceNumber() string {
|
||||||
|
return c.sequenceNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||||
|
// Upon failover, record processing is resumed from this point.
|
||||||
|
func (c *MysqlCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) {
|
||||||
|
|
||||||
|
_, err := c.Db.Exec("INSERT INTO "+c.TableName+" (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", sequenceNumber, c.key(shardID), sequenceNumber)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
c.sequenceNumber = sequenceNumber
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// key generates a unique mysql key for storage of Checkpoint.
|
||||||
|
func (c *MysqlCheckpoint) key(shardID string) string {
|
||||||
|
return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID)
|
||||||
|
}
|
||||||
59
mysql_checkpoint_test.go
Normal file
59
mysql_checkpoint_test.go
Normal file
|
|
@ -0,0 +1,59 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_MysqlKey(t *testing.T) {
|
||||||
|
k := "app:checkpoint:stream:shard"
|
||||||
|
c := MysqlCheckpoint{AppName: "app", StreamName: "stream"}
|
||||||
|
|
||||||
|
r := c.key("shard")
|
||||||
|
|
||||||
|
if r != k {
|
||||||
|
t.Errorf("key() = %v, want %v", k, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_MysqlCheckpointExists(t *testing.T) {
|
||||||
|
rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN"))
|
||||||
|
k := "app:checkpoint:stream:shard"
|
||||||
|
|
||||||
|
_, err := rc.Exec("INSERT INTO KinesisConnector.TestCheckpoint (sequence_number, checkpoint_key) VALUES (?, ?) ON DUPLICATE KEY UPDATE sequence_number = ?", "fakeSeqNum", k, "fakeSeqNum")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot insert checkpoint into db manually, %s", err)
|
||||||
|
}
|
||||||
|
c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc}
|
||||||
|
|
||||||
|
r := c.CheckpointExists("shard")
|
||||||
|
|
||||||
|
if r != true {
|
||||||
|
t.Errorf("CheckpointExists() = %v, want %v", false, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_MysqlSetCheckpoint(t *testing.T) {
|
||||||
|
k := "app:checkpoint:stream:shard"
|
||||||
|
|
||||||
|
rc, _ := sql.Open("mysql", os.Getenv("CHECKPOINT_MYSQL_DSN"))
|
||||||
|
|
||||||
|
c := MysqlCheckpoint{AppName: "app", StreamName: "stream", TableName: "KinesisConnector.TestCheckpoint", Db: rc}
|
||||||
|
c.SetCheckpoint("shard", "fakeSeqNum")
|
||||||
|
|
||||||
|
rslt := rc.QueryRow("SELECT sequence_number FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k)
|
||||||
|
var sequenceNumber string
|
||||||
|
err := rslt.Scan(&sequenceNumber)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot scan row for checkpoint key, %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if sequenceNumber != "fakeSeqNum" {
|
||||||
|
t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", sequenceNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
rc.Exec("DELETE FROM KinesisConnector.TestCheckpoint WHERE checkpoint_key = ?", k)
|
||||||
|
}
|
||||||
42
pipeline.go
42
pipeline.go
|
|
@ -3,7 +3,8 @@ package connector
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/ezoic/go-kinesis"
|
||||||
|
l4g "github.com/ezoic/log4go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Pipeline is used as a record processor to configure a pipline.
|
// Pipeline is used as a record processor to configure a pipline.
|
||||||
|
|
@ -12,12 +13,13 @@ import (
|
||||||
// interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model.
|
// interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model.
|
||||||
// Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter.
|
// Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter.
|
||||||
type Pipeline struct {
|
type Pipeline struct {
|
||||||
Buffer Buffer
|
Buffer Buffer
|
||||||
Checkpoint Checkpoint
|
Checkpoint Checkpoint
|
||||||
Emitter Emitter
|
Emitter Emitter
|
||||||
Filter Filter
|
Filter Filter
|
||||||
StreamName string
|
StreamName string
|
||||||
Transformer Transformer
|
Transformer Transformer
|
||||||
|
CheckpointFilteredRecords bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessShard kicks off the process of a Kinesis Shard.
|
// ProcessShard kicks off the process of a Kinesis Shard.
|
||||||
|
|
@ -42,13 +44,31 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
|
|
||||||
shardIterator := shardInfo.ShardIterator
|
shardIterator := shardInfo.ShardIterator
|
||||||
|
|
||||||
|
consecutiveErrorAttempts := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
||||||
|
if consecutiveErrorAttempts > 50 {
|
||||||
|
log.Fatalln("Too many consecutive error attempts")
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle the aws backoff stuff
|
||||||
|
handleAwsWaitTimeExp(consecutiveErrorAttempts)
|
||||||
|
|
||||||
args = kinesis.NewArgs()
|
args = kinesis.NewArgs()
|
||||||
args.Add("ShardIterator", shardIterator)
|
args.Add("ShardIterator", shardIterator)
|
||||||
recordSet, err := ksis.GetRecords(args)
|
recordSet, err := ksis.GetRecords(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("GetRecords ERROR: %v\n", err)
|
if isRecoverableError(err) {
|
||||||
|
p.Logger.Infof("recoverable error, %s", err)
|
||||||
|
consecutiveErrorAttempts++
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
p.Logger.Fatalf("GetRecords ERROR: %v\n", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
consecutiveErrorAttempts = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(recordSet.Records) > 0 {
|
if len(recordSet.Records) > 0 {
|
||||||
|
|
@ -64,6 +84,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
|
|
||||||
if p.Filter.KeepRecord(r) {
|
if p.Filter.KeepRecord(r) {
|
||||||
p.Buffer.ProcessRecord(r, v.SequenceNumber)
|
p.Buffer.ProcessRecord(r, v.SequenceNumber)
|
||||||
|
} else if p.CheckpointFilteredRecords {
|
||||||
|
p.Buffer.ProcessRecord(nil, v.SequenceNumber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
||||||
|
|
@ -74,7 +96,9 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Buffer.ShouldFlush() {
|
if p.Buffer.ShouldFlush() {
|
||||||
p.Emitter.Emit(p.Buffer, p.Transformer)
|
if p.Buffer.NumRecordsInBuffer() > 0 {
|
||||||
|
p.Emitter.Emit(p.Buffer, p.Transformer)
|
||||||
|
}
|
||||||
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
|
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
|
||||||
p.Buffer.Flush()
|
p.Buffer.Flush()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,9 @@ func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string)
|
||||||
b.lastSequenceNumber = sequenceNumber
|
b.lastSequenceNumber = sequenceNumber
|
||||||
|
|
||||||
if !b.sequenceExists(sequenceNumber) {
|
if !b.sequenceExists(sequenceNumber) {
|
||||||
b.recordsInBuffer = append(b.recordsInBuffer, record)
|
if record != nil {
|
||||||
|
b.recordsInBuffer = append(b.recordsInBuffer, record)
|
||||||
|
}
|
||||||
b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber)
|
b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -32,7 +34,7 @@ func (b *RecordBuffer) Records() []interface{} {
|
||||||
|
|
||||||
// NumRecordsInBuffer returns the number of messages in the buffer.
|
// NumRecordsInBuffer returns the number of messages in the buffer.
|
||||||
func (b RecordBuffer) NumRecordsInBuffer() int {
|
func (b RecordBuffer) NumRecordsInBuffer() int {
|
||||||
return len(b.sequencesInBuffer)
|
return len(b.recordsInBuffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush empties the buffer and resets the sequence counter.
|
// Flush empties the buffer and resets the sequence counter.
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
// Postgres package is used when sql.Open is called
|
// Postgres package is used when sql.Open is called
|
||||||
|
l4g "github.com/ezoic/log4go"
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -18,7 +19,9 @@ type RedshiftBasicEmitter struct {
|
||||||
Format string
|
Format string
|
||||||
Jsonpaths string
|
Jsonpaths string
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
|
S3Prefix string
|
||||||
TableName string
|
TableName string
|
||||||
|
Db *sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
||||||
|
|
@ -27,27 +30,42 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
||||||
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
||||||
s3Emitter.Emit(b, t)
|
s3Emitter.Emit(b, t)
|
||||||
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
||||||
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
|
||||||
|
|
||||||
if err != nil {
|
stmt := e.copyStatement(s3File)
|
||||||
logger.Fatalf("sql.Open ERROR: %v\n", err)
|
|
||||||
|
var err error
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
|
||||||
|
// handle aws backoff, this may be necessary if, for example, the
|
||||||
|
// s3 file has not appeared to the database yet
|
||||||
|
handleAwsWaitTimeExp(i)
|
||||||
|
|
||||||
|
// load into the database
|
||||||
|
_, err := e.Db.Exec(stmt)
|
||||||
|
|
||||||
|
// if the request succeeded, or its an unrecoverable error, break out of the loop
|
||||||
|
// because we are done
|
||||||
|
if err == nil || isRecoverableError(err) == false {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// recoverable error, lets warn
|
||||||
|
l4g.Warn(err)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(e.copyStatement(s3File))
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("db.Exec ERROR: %v\n", err)
|
logger.Fatalf("db.Exec ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Printf("Redshift load completed.\n")
|
logger.Printf("Redshift load completed.\n")
|
||||||
db.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates the SQL copy statement issued to Redshift cluster.
|
// Creates the SQL copy statement issued to Redshift cluster.
|
||||||
func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
|
func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
|
||||||
b := new(bytes.Buffer)
|
b := new(bytes.Buffer)
|
||||||
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
||||||
b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File))
|
b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File))
|
||||||
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY")))
|
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY")))
|
||||||
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY")))
|
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY")))
|
||||||
switch e.Format {
|
switch e.Format {
|
||||||
|
|
@ -59,5 +77,6 @@ func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
|
||||||
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
|
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
|
||||||
}
|
}
|
||||||
b.WriteString(";")
|
b.WriteString(";")
|
||||||
|
l4g.Debug(b.String())
|
||||||
return b.String()
|
return b.String()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ func TestCopyStatement(t *testing.T) {
|
||||||
S3Bucket: "test_bucket",
|
S3Bucket: "test_bucket",
|
||||||
TableName: "test_table",
|
TableName: "test_table",
|
||||||
}
|
}
|
||||||
f := e.copyStatement("/test.txt")
|
f := e.copyStatement("test.txt")
|
||||||
|
|
||||||
copyStatement := "COPY test_table FROM 's3://test_bucket/test.txt' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER ',';"
|
copyStatement := "COPY test_table FROM 's3://test_bucket/test.txt' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER ',';"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
"github.com/crowdmob/goamz/s3"
|
"github.com/crowdmob/goamz/s3"
|
||||||
|
l4g "github.com/ezoic/log4go"
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -118,7 +119,7 @@ func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileN
|
||||||
content := e.generateManifestFile(files)
|
content := e.generateManifestFile(files)
|
||||||
err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{})
|
err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error occured while uploding to S3: %v\n", err)
|
l4g.Error("Error occured while uploding to S3: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
"github.com/crowdmob/goamz/s3"
|
"github.com/crowdmob/goamz/s3"
|
||||||
|
l4g "github.com/ezoic/log4go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3.
|
// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3.
|
||||||
|
|
@ -17,13 +18,18 @@ import (
|
||||||
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
||||||
type S3Emitter struct {
|
type S3Emitter struct {
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
|
S3Prefix string
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current
|
// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current
|
||||||
// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches.
|
// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches.
|
||||||
func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
|
func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
|
||||||
date := time.Now().UTC().Format("2006/01/02")
|
date := time.Now().UTC().Format("2006/01/02")
|
||||||
return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq)
|
if e.S3Prefix == "" {
|
||||||
|
return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq)
|
||||||
|
} else {
|
||||||
|
return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
||||||
|
|
|
||||||
|
|
@ -8,12 +8,22 @@ import (
|
||||||
|
|
||||||
func TestS3FileName(t *testing.T) {
|
func TestS3FileName(t *testing.T) {
|
||||||
d := time.Now().UTC().Format("2006/01/02")
|
d := time.Now().UTC().Format("2006/01/02")
|
||||||
e := S3Emitter{}
|
e := S3Emitter{S3Bucket: "bucket", S3Prefix: "prefix"}
|
||||||
|
|
||||||
expected := fmt.Sprintf("%v/a-b", d)
|
expected := fmt.Sprintf("prefix/%v/a-b", d)
|
||||||
result := e.S3FileName("a", "b")
|
result := e.S3FileName("a", "b")
|
||||||
|
|
||||||
if result != expected {
|
if result != expected {
|
||||||
t.Errorf("S3FileName() = %v want %v", result, expected)
|
t.Errorf("S3FileName() = %v want %v", result, expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.S3Prefix = ""
|
||||||
|
|
||||||
|
expected = fmt.Sprintf("%v/a-b", d)
|
||||||
|
result = e.S3FileName("a", "b")
|
||||||
|
|
||||||
|
if result != expected {
|
||||||
|
t.Errorf("S3FileName() = %v want %v", result, expected)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// An implementation of Emitter that puts event data on S3 file, and then puts the
|
// An implementation of Emitter that puts event data on S3 file, and then puts the
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue