Use AWS SDK

Limit the amount of library dependencies by using the official AWS SDK.

https://github.com/harlow/kinesis-connectors/issues/19
This commit is contained in:
Harlow Ward 2015-08-15 22:20:34 -07:00
parent 8277ffc6be
commit 18173842fb
10 changed files with 96 additions and 145 deletions

View file

@ -6,11 +6,11 @@ package connector
// time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on
// these limits.
type Buffer interface {
ProcessRecord(record interface{}, sequenceNumber string)
FirstSequenceNumber() string
Flush()
LastSequenceNumber() string
NumRecordsInBuffer() int
ProcessRecord(record interface{}, sequenceNumber string)
Records() []interface{}
ShouldFlush() bool
}

View file

@ -7,5 +7,5 @@ package connector
// Implementations may choose to fail the entire set of records in the buffer or to fail records
// individually.
type Emitter interface {
Emit(b Buffer, t Transformer, shardID string)
Emit(b Buffer, t Transformer)
}

View file

@ -75,7 +75,7 @@ func main() {
for _, shard := range streamInfo.StreamDescription.Shards {
fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName)
p := newS3Pipeline(cfg)
go p.ProcessShard(ksis, shard.ShardId)
go p.ProcessShard(shard.ShardId)
}
// Keep alive

View file

@ -22,7 +22,8 @@ func main() {
connector.CreateStream(ksis, "userStream", 2)
// read file
file, _ := os.Open("users.txt")
// https://s3.amazonaws.com/kinesis.test/users.txt
file, _ := os.Open("tmp/users.txt")
defer file.Close()
scanner := bufio.NewScanner(file)

View file

@ -1,10 +1,11 @@
package connector
import (
"os"
"time"
"github.com/sendgridlabs/go-kinesis"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/kinesis"
)
// Pipeline is used as a record processor to configure a pipline.
@ -13,94 +14,84 @@ import (
// interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model.
// Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter.
type Pipeline struct {
Buffer Buffer
Checkpoint Checkpoint
Emitter Emitter
Filter Filter
StreamName string
Transformer Transformer
CheckpointFilteredRecords bool
Buffer Buffer
Checkpoint Checkpoint
Emitter Emitter
Filter Filter
Kinesis *kinesis.Kinesis
StreamName string
Transformer Transformer
checkpointSequenceNumber string
}
// ProcessShard kicks off the process of a Kinesis Shard.
// It is a long running process that will continue to read from the shard.
func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
args := kinesis.NewArgs()
args.Add("ShardId", shardID)
args.Add("StreamName", p.StreamName)
// ProcessShard is a long running process that handles reading records from a Kinesis shard.
func (p Pipeline) ProcessShard(shardID string) {
svc := kinesis.New(&aws.Config{Region: "us-east-1"})
args := &kinesis.GetShardIteratorInput{
ShardID: aws.String(shardID),
StreamName: aws.String(p.StreamName),
}
if p.Checkpoint.CheckpointExists(shardID) {
args.Add("ShardIteratorType", "AFTER_SEQUENCE_NUMBER")
args.Add("StartingSequenceNumber", p.Checkpoint.SequenceNumber())
args.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
args.StartingSequenceNumber = aws.String(p.Checkpoint.SequenceNumber())
} else {
args.Add("ShardIteratorType", "TRIM_HORIZON")
args.ShardIteratorType = aws.String("TRIM_HORIZON")
}
shardInfo, err := ksis.GetShardIterator(args)
resp, err := svc.GetShardIterator(args)
if err != nil {
logger.Log("error", "GetShardIterator", "msg", err.Error())
os.Exit(1)
if awsErr, ok := err.(awserr.Error); ok {
logger.Log("error", "GetShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr())
return
}
}
shardIterator := shardInfo.ShardIterator
consecutiveErrorAttempts := 0
shardIterator := resp.ShardIterator
for {
if consecutiveErrorAttempts > 50 {
logger.Log("error", "errorAttempts", "msg", "Too many consecutive error attempts")
os.Exit(1)
}
args = kinesis.NewArgs()
args.Add("ShardIterator", shardIterator)
recordSet, err := ksis.GetRecords(args)
args := &kinesis.GetRecordsInput{ShardIterator: shardIterator}
resp, err := svc.GetRecords(args)
if err != nil {
if isRecoverableError(err) {
consecutiveErrorAttempts++
handleAwsWaitTimeExp(consecutiveErrorAttempts)
continue
} else {
logger.Log("error", "GetRecords", "msg", err.Error())
os.Exit(1)
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ProvisionedThroughputExceededException" {
logger.Log("info", "GetRecords", "shardId", shardID, "msg", "rateLimit")
time.Sleep(5 * time.Second)
continue
} else {
logger.Log("error", "GetRecords", "shardId", shardID, "code", awsErr.Code(), "msg", awsErr.Message())
break
}
}
} else {
consecutiveErrorAttempts = 0
}
if len(recordSet.Records) > 0 {
for _, v := range recordSet.Records {
data := v.GetData()
if len(resp.Records) > 0 {
for _, r := range resp.Records {
transformedRecord := p.Transformer.ToRecord(r.Data)
if err != nil {
logger.Log("info", "GetData", "msg", err.Error())
continue
if p.Filter.KeepRecord(transformedRecord) {
p.Buffer.ProcessRecord(transformedRecord, *r.SequenceNumber)
}
r := p.Transformer.ToRecord(data)
if p.Filter.KeepRecord(r) {
p.Buffer.ProcessRecord(r, v.SequenceNumber)
} else if p.CheckpointFilteredRecords {
p.Buffer.ProcessRecord(nil, v.SequenceNumber)
}
p.checkpointSequenceNumber = *r.SequenceNumber
}
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
if p.Buffer.ShouldFlush() {
p.Emitter.Emit(p.Buffer, p.Transformer)
p.Checkpoint.SetCheckpoint(shardID, p.checkpointSequenceNumber)
p.Buffer.Flush()
}
} else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator {
logger.Log("error", "NextShardIterator", "msg", err.Error())
break
} else {
time.Sleep(5 * time.Second)
time.Sleep(1 * time.Second)
}
if p.Buffer.ShouldFlush() {
if p.Buffer.NumRecordsInBuffer() > 0 {
p.Emitter.Emit(p.Buffer, p.Transformer, shardID)
}
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
p.Buffer.Flush()
}
shardIterator = recordSet.NextShardIterator
shardIterator = resp.NextShardIterator
}
}

View file

@ -8,23 +8,16 @@ type RecordBuffer struct {
firstSequenceNumber string
lastSequenceNumber string
recordsInBuffer []interface{}
sequencesInBuffer []string
}
// ProcessRecord adds a message to the buffer.
func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) {
if len(b.sequencesInBuffer) == 0 {
if b.NumRecordsInBuffer() == 0 {
b.firstSequenceNumber = sequenceNumber
}
b.lastSequenceNumber = sequenceNumber
if !b.sequenceExists(sequenceNumber) {
if record != nil {
b.recordsInBuffer = append(b.recordsInBuffer, record)
}
b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber)
}
b.recordsInBuffer = append(b.recordsInBuffer, record)
}
// Records returns the records in the buffer.
@ -40,22 +33,11 @@ func (b RecordBuffer) NumRecordsInBuffer() int {
// Flush empties the buffer and resets the sequence counter.
func (b *RecordBuffer) Flush() {
b.recordsInBuffer = b.recordsInBuffer[:0]
b.sequencesInBuffer = b.sequencesInBuffer[:0]
}
// Checks if the sequence already exists in the buffer.
func (b *RecordBuffer) sequenceExists(sequenceNumber string) bool {
for _, v := range b.sequencesInBuffer {
if v == sequenceNumber {
return true
}
}
return false
}
// ShouldFlush determines if the buffer has reached its target size.
func (b *RecordBuffer) ShouldFlush() bool {
return len(b.sequencesInBuffer) >= b.NumRecordsToBuffer
return len(b.recordsInBuffer) >= b.NumRecordsToBuffer
}
// FirstSequenceNumber returns the sequence number of the first message in the buffer.

View file

@ -28,30 +28,6 @@ func TestProcessRecord(t *testing.T) {
if b.NumRecordsInBuffer() != 2 {
t.Errorf("NumRecordsInBuffer() want %v", 2)
}
b.ProcessRecord(r2, s2)
if b.NumRecordsInBuffer() != 2 {
t.Errorf("NumRecordsInBuffer() want %v", 2)
}
}
func TestSequenceExists(t *testing.T) {
var r1, s1 = TestRecord{}, "Seq1"
var r2, s2 = TestRecord{}, "Seq2"
b := RecordBuffer{}
b.ProcessRecord(r1, s1)
if b.sequenceExists(s1) != true {
t.Errorf("sequenceExists() want %v", true)
}
b.ProcessRecord(r2, s2)
if b.sequenceExists(s2) != true {
t.Errorf("sequenceExists() want %v", true)
}
}
func TestFlush(t *testing.T) {

View file

@ -4,7 +4,6 @@ import (
"bytes"
"database/sql"
"fmt"
"os"
// Postgres package is used when sql.Open is called
_ "github.com/lib/pq"
@ -14,37 +13,39 @@ import (
// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered
// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct.
type RedshiftBasicEmitter struct {
Delimiter string
Format string
Jsonpaths string
S3Bucket string
S3Prefix string
TableName string
Db *sql.DB
AwsAccessKey string
AwsSecretAccessKey string
Delimiter string
Format string
Jsonpaths string
S3Bucket string
S3Prefix string
TableName string
Db *sql.DB
}
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
// then issues a copy command to Redshift data store.
func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer, shardID string) {
func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
s3Emitter.Emit(b, t, shardID)
s3Emitter.Emit(b, t)
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
for i := 0; i < 10; i++ {
// execute copy statement
_, err := e.Db.Exec(e.copyStatement(s3File))
// if the request succeeded, or its an unrecoverable error, break out of loop
// db command succeeded, break from loop
if err == nil {
logger.Log("info", "RedshiftBasicEmitter", "shard", shardID, "file", s3File)
break
}
// handle recoverable errors
// handle recoverable errors, else break from loop
if isRecoverableError(err) {
handleAwsWaitTimeExp(i)
} else {
logger.Log("error", "RedshiftBasicEmitter", "shard", shardID, "msg", err.Error())
logger.Log("error", "RedshiftBasicEmitter", "msg", err.Error())
break
}
}
@ -55,8 +56,8 @@ func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
b := new(bytes.Buffer)
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File))
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY")))
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY")))
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", e.AwsAccessKey))
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", e.AwsSecretAccessKey))
switch e.Format {
case "json":

View file

@ -21,19 +21,8 @@ type S3Emitter struct {
S3Prefix string
}
// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current
// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches.
func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
date := time.Now().UTC().Format("2006/01/02")
if e.S3Prefix == "" {
return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq)
} else {
return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq)
}
}
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
func (e S3Emitter) Emit(b Buffer, t Transformer, shardID string) {
func (e S3Emitter) Emit(b Buffer, t Transformer) {
auth, _ := aws.EnvAuth()
s3Con := s3.New(auth, aws.USEast)
bucket := s3Con.Bucket(e.S3Bucket)
@ -49,9 +38,20 @@ func (e S3Emitter) Emit(b Buffer, t Transformer, shardID string) {
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
if err != nil {
logger.Log("error", "S3Put", "shard", shardID, "msg", err.Error())
logger.Log("error", "S3Put", "msg", err.Error())
os.Exit(1)
} else {
logger.Log("info", "S3Emitter", "shard", shardID, "bucket", e.S3Bucket, "numRecords", b.NumRecordsInBuffer())
logger.Log("info", "S3Put", "recordsEmitted", len(b.Records()))
}
}
// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current
// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches.
func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
date := time.Now().UTC().Format("2006/01/02")
if e.S3Prefix == "" {
return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq)
} else {
return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq)
}
}

View file

@ -14,11 +14,11 @@ type S3ManifestEmitter struct {
Ksis *kinesis.Kinesis
}
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer, shardID string) {
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
// Emit buffer contents to S3 Bucket
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
s3Emitter.Emit(b, t, shardID)
s3Emitter.Emit(b, t)
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
// Emit the file path to Kinesis Output stream
@ -33,6 +33,6 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer, shardID string) {
logger.Log("error", "PutRecord", "msg", err)
os.Exit(1)
} else {
logger.Log("info", "S3ManifestEmitter", "shard", shardID, "firstSequenceNumber", b.FirstSequenceNumber(), "stream", e.OutputStream)
logger.Log("info", "S3ManifestEmitter", "firstSequenceNumber", b.FirstSequenceNumber(), "stream", e.OutputStream)
}
}