diff --git a/buffer.go b/buffer.go index 7209c27..9039c7e 100644 --- a/buffer.go +++ b/buffer.go @@ -6,11 +6,11 @@ package connector // time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on // these limits. type Buffer interface { - ProcessRecord(record interface{}, sequenceNumber string) FirstSequenceNumber() string Flush() LastSequenceNumber() string NumRecordsInBuffer() int + ProcessRecord(record interface{}, sequenceNumber string) Records() []interface{} ShouldFlush() bool } diff --git a/emitter.go b/emitter.go index a28c591..6cb8851 100644 --- a/emitter.go +++ b/emitter.go @@ -7,5 +7,5 @@ package connector // Implementations may choose to fail the entire set of records in the buffer or to fail records // individually. type Emitter interface { - Emit(b Buffer, t Transformer, shardID string) + Emit(b Buffer, t Transformer) } diff --git a/examples/s3-pipeline/main.go b/examples/s3-pipeline/main.go index 61557c0..a889564 100644 --- a/examples/s3-pipeline/main.go +++ b/examples/s3-pipeline/main.go @@ -75,7 +75,7 @@ func main() { for _, shard := range streamInfo.StreamDescription.Shards { fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName) p := newS3Pipeline(cfg) - go p.ProcessShard(ksis, shard.ShardId) + go p.ProcessShard(shard.ShardId) } // Keep alive diff --git a/examples/seed-stream/main.go b/examples/seed-stream/main.go index aebec28..1c29888 100644 --- a/examples/seed-stream/main.go +++ b/examples/seed-stream/main.go @@ -22,7 +22,8 @@ func main() { connector.CreateStream(ksis, "userStream", 2) // read file - file, _ := os.Open("users.txt") + // https://s3.amazonaws.com/kinesis.test/users.txt + file, _ := os.Open("tmp/users.txt") defer file.Close() scanner := bufio.NewScanner(file) diff --git a/pipeline.go b/pipeline.go index 7b05d98..6970564 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,10 +1,11 @@ package connector import ( - "os" "time" - "github.com/sendgridlabs/go-kinesis" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/kinesis" ) // Pipeline is used as a record processor to configure a pipline. @@ -13,94 +14,84 @@ import ( // interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model. // Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter. type Pipeline struct { - Buffer Buffer - Checkpoint Checkpoint - Emitter Emitter - Filter Filter - StreamName string - Transformer Transformer - CheckpointFilteredRecords bool + Buffer Buffer + Checkpoint Checkpoint + Emitter Emitter + Filter Filter + Kinesis *kinesis.Kinesis + StreamName string + Transformer Transformer + + checkpointSequenceNumber string } -// ProcessShard kicks off the process of a Kinesis Shard. -// It is a long running process that will continue to read from the shard. -func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { - args := kinesis.NewArgs() - args.Add("ShardId", shardID) - args.Add("StreamName", p.StreamName) +// ProcessShard is a long running process that handles reading records from a Kinesis shard. +func (p Pipeline) ProcessShard(shardID string) { + svc := kinesis.New(&aws.Config{Region: "us-east-1"}) + + args := &kinesis.GetShardIteratorInput{ + ShardID: aws.String(shardID), + StreamName: aws.String(p.StreamName), + } if p.Checkpoint.CheckpointExists(shardID) { - args.Add("ShardIteratorType", "AFTER_SEQUENCE_NUMBER") - args.Add("StartingSequenceNumber", p.Checkpoint.SequenceNumber()) + args.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") + args.StartingSequenceNumber = aws.String(p.Checkpoint.SequenceNumber()) } else { - args.Add("ShardIteratorType", "TRIM_HORIZON") + args.ShardIteratorType = aws.String("TRIM_HORIZON") } - shardInfo, err := ksis.GetShardIterator(args) + resp, err := svc.GetShardIterator(args) if err != nil { - logger.Log("error", "GetShardIterator", "msg", err.Error()) - os.Exit(1) + if awsErr, ok := err.(awserr.Error); ok { + logger.Log("error", "GetShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr()) + return + } } - shardIterator := shardInfo.ShardIterator - consecutiveErrorAttempts := 0 + shardIterator := resp.ShardIterator for { - if consecutiveErrorAttempts > 50 { - logger.Log("error", "errorAttempts", "msg", "Too many consecutive error attempts") - os.Exit(1) - } - - args = kinesis.NewArgs() - args.Add("ShardIterator", shardIterator) - recordSet, err := ksis.GetRecords(args) + args := &kinesis.GetRecordsInput{ShardIterator: shardIterator} + resp, err := svc.GetRecords(args) if err != nil { - if isRecoverableError(err) { - consecutiveErrorAttempts++ - handleAwsWaitTimeExp(consecutiveErrorAttempts) - continue - } else { - logger.Log("error", "GetRecords", "msg", err.Error()) - os.Exit(1) + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "ProvisionedThroughputExceededException" { + logger.Log("info", "GetRecords", "shardId", shardID, "msg", "rateLimit") + time.Sleep(5 * time.Second) + continue + } else { + logger.Log("error", "GetRecords", "shardId", shardID, "code", awsErr.Code(), "msg", awsErr.Message()) + break + } } - } else { - consecutiveErrorAttempts = 0 } - if len(recordSet.Records) > 0 { - for _, v := range recordSet.Records { - data := v.GetData() + if len(resp.Records) > 0 { + for _, r := range resp.Records { + transformedRecord := p.Transformer.ToRecord(r.Data) - if err != nil { - logger.Log("info", "GetData", "msg", err.Error()) - continue + if p.Filter.KeepRecord(transformedRecord) { + p.Buffer.ProcessRecord(transformedRecord, *r.SequenceNumber) } - r := p.Transformer.ToRecord(data) - - if p.Filter.KeepRecord(r) { - p.Buffer.ProcessRecord(r, v.SequenceNumber) - } else if p.CheckpointFilteredRecords { - p.Buffer.ProcessRecord(nil, v.SequenceNumber) - } + p.checkpointSequenceNumber = *r.SequenceNumber } - } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { + + if p.Buffer.ShouldFlush() { + p.Emitter.Emit(p.Buffer, p.Transformer) + p.Checkpoint.SetCheckpoint(shardID, p.checkpointSequenceNumber) + p.Buffer.Flush() + } + } else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator { logger.Log("error", "NextShardIterator", "msg", err.Error()) break } else { - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) } - if p.Buffer.ShouldFlush() { - if p.Buffer.NumRecordsInBuffer() > 0 { - p.Emitter.Emit(p.Buffer, p.Transformer, shardID) - } - p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) - p.Buffer.Flush() - } - - shardIterator = recordSet.NextShardIterator + shardIterator = resp.NextShardIterator } } diff --git a/record_buffer.go b/record_buffer.go index 8f01f58..012e1ed 100644 --- a/record_buffer.go +++ b/record_buffer.go @@ -8,23 +8,16 @@ type RecordBuffer struct { firstSequenceNumber string lastSequenceNumber string recordsInBuffer []interface{} - sequencesInBuffer []string } // ProcessRecord adds a message to the buffer. func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) { - if len(b.sequencesInBuffer) == 0 { + if b.NumRecordsInBuffer() == 0 { b.firstSequenceNumber = sequenceNumber } b.lastSequenceNumber = sequenceNumber - - if !b.sequenceExists(sequenceNumber) { - if record != nil { - b.recordsInBuffer = append(b.recordsInBuffer, record) - } - b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber) - } + b.recordsInBuffer = append(b.recordsInBuffer, record) } // Records returns the records in the buffer. @@ -40,22 +33,11 @@ func (b RecordBuffer) NumRecordsInBuffer() int { // Flush empties the buffer and resets the sequence counter. func (b *RecordBuffer) Flush() { b.recordsInBuffer = b.recordsInBuffer[:0] - b.sequencesInBuffer = b.sequencesInBuffer[:0] -} - -// Checks if the sequence already exists in the buffer. -func (b *RecordBuffer) sequenceExists(sequenceNumber string) bool { - for _, v := range b.sequencesInBuffer { - if v == sequenceNumber { - return true - } - } - return false } // ShouldFlush determines if the buffer has reached its target size. func (b *RecordBuffer) ShouldFlush() bool { - return len(b.sequencesInBuffer) >= b.NumRecordsToBuffer + return len(b.recordsInBuffer) >= b.NumRecordsToBuffer } // FirstSequenceNumber returns the sequence number of the first message in the buffer. diff --git a/record_buffer_test.go b/record_buffer_test.go index 7840e4b..74c0db7 100644 --- a/record_buffer_test.go +++ b/record_buffer_test.go @@ -28,30 +28,6 @@ func TestProcessRecord(t *testing.T) { if b.NumRecordsInBuffer() != 2 { t.Errorf("NumRecordsInBuffer() want %v", 2) } - - b.ProcessRecord(r2, s2) - - if b.NumRecordsInBuffer() != 2 { - t.Errorf("NumRecordsInBuffer() want %v", 2) - } -} - -func TestSequenceExists(t *testing.T) { - var r1, s1 = TestRecord{}, "Seq1" - var r2, s2 = TestRecord{}, "Seq2" - - b := RecordBuffer{} - b.ProcessRecord(r1, s1) - - if b.sequenceExists(s1) != true { - t.Errorf("sequenceExists() want %v", true) - } - - b.ProcessRecord(r2, s2) - - if b.sequenceExists(s2) != true { - t.Errorf("sequenceExists() want %v", true) - } } func TestFlush(t *testing.T) { diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 82370a9..28be827 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -4,7 +4,6 @@ import ( "bytes" "database/sql" "fmt" - "os" // Postgres package is used when sql.Open is called _ "github.com/lib/pq" @@ -14,37 +13,39 @@ import ( // It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered // data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct. type RedshiftBasicEmitter struct { - Delimiter string - Format string - Jsonpaths string - S3Bucket string - S3Prefix string - TableName string - Db *sql.DB + AwsAccessKey string + AwsSecretAccessKey string + Delimiter string + Format string + Jsonpaths string + S3Bucket string + S3Prefix string + TableName string + Db *sql.DB } // Emit is invoked when the buffer is full. This method leverages the S3Emitter and // then issues a copy command to Redshift data store. -func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer, shardID string) { +func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} - s3Emitter.Emit(b, t, shardID) + s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) for i := 0; i < 10; i++ { // execute copy statement _, err := e.Db.Exec(e.copyStatement(s3File)) - // if the request succeeded, or its an unrecoverable error, break out of loop + // db command succeeded, break from loop if err == nil { logger.Log("info", "RedshiftBasicEmitter", "shard", shardID, "file", s3File) break } - // handle recoverable errors + // handle recoverable errors, else break from loop if isRecoverableError(err) { handleAwsWaitTimeExp(i) } else { - logger.Log("error", "RedshiftBasicEmitter", "shard", shardID, "msg", err.Error()) + logger.Log("error", "RedshiftBasicEmitter", "msg", err.Error()) break } } @@ -55,8 +56,8 @@ func (e RedshiftBasicEmitter) copyStatement(s3File string) string { b := new(bytes.Buffer) b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File)) - b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY"))) - b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY"))) + b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", e.AwsAccessKey)) + b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", e.AwsSecretAccessKey)) switch e.Format { case "json": diff --git a/s3_emitter.go b/s3_emitter.go index c6dde5c..823a134 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -21,19 +21,8 @@ type S3Emitter struct { S3Prefix string } -// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current -// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches. -func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { - date := time.Now().UTC().Format("2006/01/02") - if e.S3Prefix == "" { - return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) - } else { - return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq) - } -} - // Emit is invoked when the buffer is full. This method emits the set of filtered records. -func (e S3Emitter) Emit(b Buffer, t Transformer, shardID string) { +func (e S3Emitter) Emit(b Buffer, t Transformer) { auth, _ := aws.EnvAuth() s3Con := s3.New(auth, aws.USEast) bucket := s3Con.Bucket(e.S3Bucket) @@ -49,9 +38,20 @@ func (e S3Emitter) Emit(b Buffer, t Transformer, shardID string) { err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) if err != nil { - logger.Log("error", "S3Put", "shard", shardID, "msg", err.Error()) + logger.Log("error", "S3Put", "msg", err.Error()) os.Exit(1) } else { - logger.Log("info", "S3Emitter", "shard", shardID, "bucket", e.S3Bucket, "numRecords", b.NumRecordsInBuffer()) + logger.Log("info", "S3Put", "recordsEmitted", len(b.Records())) + } +} + +// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current +// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches. +func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { + date := time.Now().UTC().Format("2006/01/02") + if e.S3Prefix == "" { + return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) + } else { + return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq) } } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 00a1b0b..9977dcb 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -14,11 +14,11 @@ type S3ManifestEmitter struct { Ksis *kinesis.Kinesis } -func (e S3ManifestEmitter) Emit(b Buffer, t Transformer, shardID string) { +func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { // Emit buffer contents to S3 Bucket s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} - s3Emitter.Emit(b, t, shardID) + s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) // Emit the file path to Kinesis Output stream @@ -33,6 +33,6 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer, shardID string) { logger.Log("error", "PutRecord", "msg", err) os.Exit(1) } else { - logger.Log("info", "S3ManifestEmitter", "shard", shardID, "firstSequenceNumber", b.FirstSequenceNumber(), "stream", e.OutputStream) + logger.Log("info", "S3ManifestEmitter", "firstSequenceNumber", b.FirstSequenceNumber(), "stream", e.OutputStream) } }