From f0e6461cb6a27a100e3f6707704093da10df4215 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Tue, 2 Feb 2016 21:04:22 -0800 Subject: [PATCH] Refactor to use handler func The previous pipeline model required a lot of setup and abstracted away the processing of records. By passing a HandlerFunc to the consumer we keep the business logic of processing of records closer to the use of the consumer. * Add refactoring note and SHA to README --- README.md | 33 +++-- all_pass_filter.go | 9 -- awsbackoff_test.go => awsbackoff_test.go.bk | 8 +- buffer.go | 58 +++++++-- buffer_test.go | 47 +++++++ checkpoint.go | 52 ++++++-- checkpoint_test.go | 46 +++++++ consumer.go | 118 ++++++++++++++++++ emitter.go | 11 -- manifest.go => emitter/redshift/manifest.go | 0 .../redshift/redshift_emitter.go | 14 +-- .../redshift/redshift_manifest_emitter.go | 0 .../redshift_manifest_emitter_test.go | 0 .../{redshift-pipeline => redshift}/main.go | 3 - .../pipeline.cfg | 0 examples/s3-pipeline/main.go | 83 ------------ examples/s3-pipeline/pipeline.cfg | 8 -- examples/{s3-pipeline => s3}/README.md | 2 +- examples/s3/main.go | 38 ++++++ examples/{seed-stream => seed}/README.md | 2 +- examples/{seed-stream => seed}/main.go | 49 ++++---- filter.go | 7 -- handler.go | 18 +++ kinesis.go | 54 -------- logger.go | 2 +- pipeline.go | 112 ----------------- record_buffer.go | 51 -------- record_buffer_test.go | 98 --------------- redis_checkpoint.go | 48 ------- redis_checkpoint_test.go | 48 ------- redshift_emitter.go | 74 +++++++++++ ...mitter_test.go => redshift_emitter_test.go | 4 +- s3_emitter.go | 41 ++---- s3_emitter_test.go | 27 ---- s3_key.go | 16 +++ s3_key_test.go | 19 +++ s3_manifest_emitter.go | 35 +++--- string_to_string_transformer.go | 14 --- transformer.go | 8 -- 39 files changed, 559 insertions(+), 698 deletions(-) delete mode 100644 all_pass_filter.go rename awsbackoff_test.go => awsbackoff_test.go.bk (83%) create mode 100644 buffer_test.go create mode 100644 checkpoint_test.go create mode 100644 consumer.go delete mode 100644 emitter.go rename manifest.go => emitter/redshift/manifest.go (100%) rename redshift_basic_emitter.go => emitter/redshift/redshift_emitter.go (81%) rename redshift_manifest_emitter.go => emitter/redshift/redshift_manifest_emitter.go (100%) rename redshift_manifest_emitter_test.go => emitter/redshift/redshift_manifest_emitter_test.go (100%) rename examples/{redshift-pipeline => redshift}/main.go (94%) rename examples/{redshift-pipeline => redshift}/pipeline.cfg (100%) delete mode 100644 examples/s3-pipeline/main.go delete mode 100644 examples/s3-pipeline/pipeline.cfg rename examples/{s3-pipeline => s3}/README.md (92%) create mode 100644 examples/s3/main.go rename examples/{seed-stream => seed}/README.md (83%) rename examples/{seed-stream => seed}/main.go (59%) delete mode 100644 filter.go create mode 100644 handler.go delete mode 100644 kinesis.go delete mode 100644 pipeline.go delete mode 100644 record_buffer.go delete mode 100644 record_buffer_test.go delete mode 100644 redis_checkpoint.go delete mode 100644 redis_checkpoint_test.go create mode 100644 redshift_emitter.go rename redshift_basic_emitter_test.go => redshift_emitter_test.go (85%) delete mode 100644 s3_emitter_test.go create mode 100644 s3_key.go create mode 100644 s3_key_test.go delete mode 100644 string_to_string_transformer.go delete mode 100644 transformer.go diff --git a/README.md b/README.md index f1308e7..c71fe3b 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,9 @@ __Kinesis connector applications written in Go__ -Inspired by the [Amazon Kinesis Connector Library][1]. These components are used for extracting streaming event data -into S3, Redshift, DynamoDB, and more. See the [API Docs][2] for package documentation. +_Note: Repo is going under refactoring to use a handler func to process batch data. The previous stable version of connectors exist at SHA `509f68de89efb74aa8d79a733749208edaf56b4d`_ + +Inspired by the [Amazon Kinesis Connector Library][1]. This library is used for extracting streaming event data from Kinesis into S3, Redshift, DynamoDB, and more. See the [API Docs][2] for package documentation. [1]: https://github.com/awslabs/amazon-kinesis-connectors [2]: http://godoc.org/github.com/harlow/kinesis-connectors @@ -12,15 +13,25 @@ into S3, Redshift, DynamoDB, and more. See the [API Docs][2] for package documen ## Overview -Each Amazon Kinesis connector application is a pipeline that determines how records from an Amazon Kinesis stream will be handled. Records are retrieved from the stream, transformed according to a user-defined data model, buffered for batch processing, and then emitted to the appropriate AWS service. +The consumer expects a handler func that will process a buffer of incoming records. -A connector pipeline uses the following interfaces: +```golang +func main() { + var( + app = flag.String("app", "", "The app name") + stream = flag.String("stream", "", "The stream name") + ) + flag.Parse() -* __Pipeline:__ The pipeline implementation itself. -* __Transformer:__ Defines the transformation of records from the Amazon Kinesis stream in order to suit the user-defined data model. Includes methods for custom serializer/deserializers. -* __Filter:__ Defines a method for excluding irrelevant records from the processing. -* __Buffer:__ Defines a system for batching the set of records to be processed. The application can specify three thresholds: number of records, total byte count, and time. When one of these thresholds is crossed, the buffer is flushed and the data is emitted to the destination. -* __Emitter:__ Defines a method that makes client calls to other AWS services and persists the records stored in the buffer. The records can also be sent to another Amazon Kinesis stream. + c := connector.NewConsumer(*app, *stream) + c.Start(connector.HandlerFunc(func(b connector.Buffer) { + fmt.Println(b.GetRecords()) + // process the records + })) + + select {} +} +``` ### Installation @@ -32,8 +43,8 @@ Get the package source: Examples pipelines: -* [S3 Pipeline](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3-pipeline) -* [Redshift Basic Pipeline](https://github.com/harlow/kinesis-connectors/tree/master/examples/redshift-pipeline) +* [S3](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3) +* [Redshift](https://github.com/harlow/kinesis-connectors/tree/master/examples/redshift) ### Logging diff --git a/all_pass_filter.go b/all_pass_filter.go deleted file mode 100644 index f4252a2..0000000 --- a/all_pass_filter.go +++ /dev/null @@ -1,9 +0,0 @@ -package connector - -// AllPassFilter an implementation of the Filter interface that returns true for all records. -type AllPassFilter struct{} - -// KeepRecord returns true for all records. -func (b *AllPassFilter) KeepRecord(r interface{}) bool { - return true -} diff --git a/awsbackoff_test.go b/awsbackoff_test.go.bk similarity index 83% rename from awsbackoff_test.go rename to awsbackoff_test.go.bk index 5f52a2e..af3360e 100644 --- a/awsbackoff_test.go +++ b/awsbackoff_test.go.bk @@ -5,6 +5,7 @@ import ( "net" "testing" + "github.com/bmizerany/assert" "github.com/lib/pq" "github.com/sendgridlabs/go-kinesis" ) @@ -25,11 +26,8 @@ func Test_isRecoverableError(t *testing.T) { {err: pq.Error{Message: "Some other pq error"}, isRecoverable: false}, } - for idx, tc := range testCases { + for _, tc := range testCases { isRecoverable := isRecoverableError(tc.err) - - if isRecoverable != tc.isRecoverable { - t.Errorf("test case %d: isRecoverable expected %t, actual %t, for error %+v", idx, tc.isRecoverable, isRecoverable, tc.err) - } + assert.Equal(t, isRecoverable, tc.isRecoverable) } } diff --git a/buffer.go b/buffer.go index 9039c7e..4a0b689 100644 --- a/buffer.go +++ b/buffer.go @@ -1,16 +1,48 @@ package connector -// Buffer defines a buffer used to store records streamed through Kinesis. It is a part of the -// Pipeline utilized by the Pipeline.ProcessShard function. Records are stored in the buffer by calling -// the Add method. The buffer has two size limits defined: total total number of records and a -// time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on -// these limits. -type Buffer interface { - FirstSequenceNumber() string - Flush() - LastSequenceNumber() string - NumRecordsInBuffer() int - ProcessRecord(record interface{}, sequenceNumber string) - Records() []interface{} - ShouldFlush() bool +import "github.com/aws/aws-sdk-go/service/kinesis" + +// Buffer holds records and answers questions on when it +// should be periodically flushed. +type Buffer struct { + records []*kinesis.Record + firstSequenceNumber string + lastSequenceNumber string + + MaxBufferSize int +} + +// AddRecord adds a record to the buffer. +func (b *Buffer) AddRecord(r *kinesis.Record) { + if len(b.records) == 0 { + b.firstSequenceNumber = *r.SequenceNumber + } + + b.records = append(b.records, r) + b.lastSequenceNumber = *r.SequenceNumber +} + +// ShouldFlush determines if the buffer has reached its target size. +func (b *Buffer) ShouldFlush() bool { + return len(b.records) >= b.MaxBufferSize +} + +// Flush empties the buffer and resets the sequence counter. +func (b *Buffer) Flush() { + b.records = b.records[:0] +} + +// GetRecords returns the records in the buffer. +func (b *Buffer) GetRecords() []*kinesis.Record { + return b.records +} + +// FirstSequenceNumber returns the sequence number of the first record in the buffer. +func (b *Buffer) FirstSeq() string { + return b.firstSequenceNumber +} + +// LastSeq returns the sequence number of the last record in the buffer. +func (b *Buffer) LastSeq() string { + return b.lastSequenceNumber } diff --git a/buffer_test.go b/buffer_test.go new file mode 100644 index 0000000..d11da71 --- /dev/null +++ b/buffer_test.go @@ -0,0 +1,47 @@ +package connector + +import ( + "testing" + + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/bmizerany/assert" +) + +func Test_FirstSeq(t *testing.T) { + b := Buffer{} + s1, s2 := "1", "2" + r1 := &kinesis.Record{SequenceNumber: &s1} + r2 := &kinesis.Record{SequenceNumber: &s2} + + b.AddRecord(r1) + assert.Equal(t, b.FirstSeq(), "1") + + b.AddRecord(r2) + assert.Equal(t, b.FirstSeq(), "1") +} + +func Test_LastSeq(t *testing.T) { + b := Buffer{} + s1, s2 := "1", "2" + r1 := &kinesis.Record{SequenceNumber: &s1} + r2 := &kinesis.Record{SequenceNumber: &s2} + + b.AddRecord(r1) + assert.Equal(t, b.LastSeq(), "1") + + b.AddRecord(r2) + assert.Equal(t, b.LastSeq(), "2") +} + +func Test_ShouldFlush(t *testing.T) { + b := Buffer{MaxBufferSize: 2} + s1, s2 := "1", "2" + r1 := &kinesis.Record{SequenceNumber: &s1} + r2 := &kinesis.Record{SequenceNumber: &s2} + + b.AddRecord(r1) + assert.Equal(t, b.ShouldFlush(), false) + + b.AddRecord(r2) + assert.Equal(t, b.ShouldFlush(), true) +} diff --git a/checkpoint.go b/checkpoint.go index 62fd931..0ff948d 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -1,10 +1,48 @@ package connector -// Checkpoint is used by Pipeline.ProcessShard when they want to checkpoint their progress. -// The Kinesis Connector Library will pass an object implementing this interface to ProcessShard, -// so they can checkpoint their progress. -type Checkpoint interface { - CheckpointExists(shardID string) bool - SequenceNumber() string - SetCheckpoint(shardID string, sequenceNumber string) +import ( + "fmt" + + "github.com/hoisie/redis" +) + +// RedisCheckpoint implements the Checkpont interface. +// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress. +type Checkpoint struct { + AppName string + StreamName string + + client redis.Client + sequenceNumber string +} + +// CheckpointExists determines if a checkpoint for a particular Shard exists. +// Typically used to determine whether we should start processing the shard with +// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). +func (c *Checkpoint) CheckpointExists(shardID string) bool { + val, _ := c.client.Get(c.key(shardID)) + + if val != nil && string(val) != "" { + c.sequenceNumber = string(val) + return true + } + + return false +} + +// SequenceNumber returns the current checkpoint stored for the specified shard. +func (c *Checkpoint) SequenceNumber() string { + return c.sequenceNumber +} + +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// Upon failover, record processing is resumed from this point. +func (c *Checkpoint) SetCheckpoint(shardID string, sequenceNumber string) { + c.client.Set(c.key(shardID), []byte(sequenceNumber)) + c.sequenceNumber = sequenceNumber +} + +// key generates a unique Redis key for storage of Checkpoint. +func (c *Checkpoint) key(shardID string) string { + return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) } diff --git a/checkpoint_test.go b/checkpoint_test.go new file mode 100644 index 0000000..d06d7ee --- /dev/null +++ b/checkpoint_test.go @@ -0,0 +1,46 @@ +package connector + +import ( + "testing" + + "github.com/bmizerany/assert" + "github.com/hoisie/redis" +) + +func Test_key(t *testing.T) { + c := Checkpoint{ + AppName: "app", + StreamName: "stream", + } + + k := c.key("shard") + assert.Equal(t, k, "app:checkpoint:stream:shard") +} + +func Test_CheckpointExists(t *testing.T) { + var rc redis.Client + rc.Set("app:checkpoint:stream:shard", []byte("testSeqNum")) + c := Checkpoint{ + AppName: "app", + StreamName: "stream", + } + + r := c.CheckpointExists("shard") + assert.Equal(t, r, true) + + rc.Del("app:checkpoint:stream:shard") +} + +func Test_SetCheckpoint(t *testing.T) { + var rc redis.Client + c := Checkpoint{ + AppName: "app", + StreamName: "stream", + } + + c.SetCheckpoint("shard", "testSeqNum") + r, _ := rc.Get("app:checkpoint:stream:shard") + assert.Equal(t, string(r), "testSeqNum") + + rc.Del("app:checkpoint:stream:shard") +} diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..396c345 --- /dev/null +++ b/consumer.go @@ -0,0 +1,118 @@ +package connector + +import ( + "os" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" +) + +const maxBufferSize = 1000 + +func NewConsumer(appName, streamName string) *Consumer { + svc := kinesis.New(session.New()) + + return &Consumer{ + appName: appName, + streamName: streamName, + svc: svc, + } +} + +type Consumer struct { + appName string + streamName string + svc *kinesis.Kinesis +} + +func (c *Consumer) Start(handler Handler) { + params := &kinesis.DescribeStreamInput{ + StreamName: aws.String(c.streamName), + } + + // describe stream + resp, err := c.svc.DescribeStream(params) + if err != nil { + logger.Log("fatal", "DescribeStream", "msg", err.Error()) + os.Exit(1) + } + + // handle shards + for _, shard := range resp.StreamDescription.Shards { + logger.Log("info", "processing", "stream", c.streamName, "shard", shard.ShardId) + go c.handlerLoop(*shard.ShardId, handler) + } +} + +func (c *Consumer) handlerLoop(shardID string, handler Handler) { + params := &kinesis.GetShardIteratorInput{ + ShardId: aws.String(shardID), + StreamName: aws.String(c.streamName), + } + + checkpoint := &Checkpoint{AppName: c.appName, StreamName: c.streamName} + if checkpoint.CheckpointExists(shardID) { + params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") + params.StartingSequenceNumber = aws.String(checkpoint.SequenceNumber()) + } else { + params.ShardIteratorType = aws.String("TRIM_HORIZON") + } + + resp, err := c.svc.GetShardIterator(params) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + logger.Log("fatal", "getShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr()) + os.Exit(1) + } + } + + shardIterator := resp.ShardIterator + b := &Buffer{MaxBufferSize: maxBufferSize} + errCount := 0 + + for { + // get records from stream + resp, err := c.svc.GetRecords(&kinesis.GetRecordsInput{ + ShardIterator: shardIterator, + }) + + // handle recoverable errors, else exit program + if err != nil { + awsErr, _ := err.(awserr.Error) + + if isRecoverableError(err) { + logger.Log("warn", "getRecords", "errorCount", errCount, "code", awsErr.Code()) + handleAwsWaitTimeExp(errCount) + errCount++ + } else { + logger.Log("fatal", "getRecords", awsErr.Code()) + os.Exit(1) + } + } else { + errCount = 0 + } + + // process records + if len(resp.Records) > 0 { + for _, r := range resp.Records { + b.AddRecord(r) + } + + if b.ShouldFlush() { + handler.HandleRecords(*b) + checkpoint.SetCheckpoint(shardID, b.LastSeq()) + b.Flush() + } + } else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator { + logger.Log("fatal", "nextShardIterator", "msg", err.Error()) + os.Exit(1) + } else { + time.Sleep(1 * time.Second) + } + + shardIterator = resp.NextShardIterator + } +} diff --git a/emitter.go b/emitter.go deleted file mode 100644 index 6cb8851..0000000 --- a/emitter.go +++ /dev/null @@ -1,11 +0,0 @@ -package connector - -// Emitter takes a full buffer and processes the stored records. The Emitter is a member of the -// Pipeline that "emits" the objects that have been deserialized by the -// Transformer. The Emit() method is invoked when the buffer is full (possibly to persist the -// records or send them to another Kinesis stream). After emitting the records. -// Implementations may choose to fail the entire set of records in the buffer or to fail records -// individually. -type Emitter interface { - Emit(b Buffer, t Transformer) -} diff --git a/manifest.go b/emitter/redshift/manifest.go similarity index 100% rename from manifest.go rename to emitter/redshift/manifest.go diff --git a/redshift_basic_emitter.go b/emitter/redshift/redshift_emitter.go similarity index 81% rename from redshift_basic_emitter.go rename to emitter/redshift/redshift_emitter.go index 2746abf..cb21db3 100644 --- a/redshift_basic_emitter.go +++ b/emitter/redshift/redshift_emitter.go @@ -12,7 +12,7 @@ import ( // RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one. // It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered // data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct. -type RedshiftBasicEmitter struct { +type RedshiftEmitter struct { AwsAccessKey string AwsSecretAccessKey string Delimiter string @@ -26,10 +26,10 @@ type RedshiftBasicEmitter struct { // Emit is invoked when the buffer is full. This method leverages the S3Emitter and // then issues a copy command to Redshift data store. -func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { - s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} +func (e RedshiftEmitter) Emit(b Buffer) { + s3Emitter := S3Emitter{Bucket: e.S3Bucket} s3Emitter.Emit(b, t) - s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) + s3File := s3Emitter.S3FileName(b.FirstSeq(), b.LastSeq()) for i := 0; i < 10; i++ { // execute copy statement @@ -37,7 +37,7 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { // db command succeeded, break from loop if err == nil { - logger.Log("info", "RedshiftBasicEmitter", "file", s3File) + logger.Log("info", "RedshiftEmitter", "file", s3File) break } @@ -45,14 +45,14 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { if isRecoverableError(err) { handleAwsWaitTimeExp(i) } else { - logger.Log("error", "RedshiftBasicEmitter", "msg", err.Error()) + logger.Log("error", "RedshiftEmitter", "msg", err.Error()) break } } } // Creates the SQL copy statement issued to Redshift cluster. -func (e RedshiftBasicEmitter) copyStatement(s3File string) string { +func (e RedshiftEmitter) copyStatement(s3File string) string { b := new(bytes.Buffer) b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File)) diff --git a/redshift_manifest_emitter.go b/emitter/redshift/redshift_manifest_emitter.go similarity index 100% rename from redshift_manifest_emitter.go rename to emitter/redshift/redshift_manifest_emitter.go diff --git a/redshift_manifest_emitter_test.go b/emitter/redshift/redshift_manifest_emitter_test.go similarity index 100% rename from redshift_manifest_emitter_test.go rename to emitter/redshift/redshift_manifest_emitter_test.go diff --git a/examples/redshift-pipeline/main.go b/examples/redshift/main.go similarity index 94% rename from examples/redshift-pipeline/main.go rename to examples/redshift/main.go index 94dd4a3..5ca7514 100644 --- a/examples/redshift-pipeline/main.go +++ b/examples/redshift/main.go @@ -64,9 +64,6 @@ func main() { auth := kinesis.NewAuth() ksis := kinesis.New(&auth, kinesis.Region{}) - // Create stream - connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount) - // Fetch stream info args := kinesis.NewArgs() args.Add("StreamName", cfg.Kinesis.StreamName) diff --git a/examples/redshift-pipeline/pipeline.cfg b/examples/redshift/pipeline.cfg similarity index 100% rename from examples/redshift-pipeline/pipeline.cfg rename to examples/redshift/pipeline.cfg diff --git a/examples/s3-pipeline/main.go b/examples/s3-pipeline/main.go deleted file mode 100644 index a889564..0000000 --- a/examples/s3-pipeline/main.go +++ /dev/null @@ -1,83 +0,0 @@ -package main - -import ( - "fmt" - - "code.google.com/p/gcfg" - "github.com/harlow/kinesis-connectors" - "github.com/sendgridlabs/go-kinesis" -) - -type Config struct { - Pipeline struct { - Name string - } - Kinesis struct { - BufferSize int - ShardCount int - StreamName string - } - S3 struct { - BucketName string - } -} - -func newS3Pipeline(cfg Config) *connector.Pipeline { - f := &connector.AllPassFilter{} - b := &connector.RecordBuffer{ - NumRecordsToBuffer: cfg.Kinesis.BufferSize, - } - t := &connector.StringToStringTransformer{} - c := &connector.RedisCheckpoint{ - AppName: cfg.Pipeline.Name, - StreamName: cfg.Kinesis.StreamName, - } - e := &connector.S3Emitter{ - S3Bucket: cfg.S3.BucketName, - } - return &connector.Pipeline{ - Buffer: b, - Checkpoint: c, - Emitter: e, - Filter: f, - StreamName: cfg.Kinesis.StreamName, - Transformer: t, - } -} - -func main() { - var cfg Config - var err error - - // Load config vars - err = gcfg.ReadFileInto(&cfg, "pipeline.cfg") - if err != nil { - fmt.Printf("Config ERROR: %v\n", err) - } - - // Initialize Kinesis client - auth := kinesis.NewAuth() - ksis := kinesis.New(&auth, kinesis.Region{}) - - // Create stream - connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount) - - // Fetch stream info - args := kinesis.NewArgs() - args.Add("StreamName", cfg.Kinesis.StreamName) - streamInfo, err := ksis.DescribeStream(args) - if err != nil { - fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName) - return - } - - // Process kinesis shards - for _, shard := range streamInfo.StreamDescription.Shards { - fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName) - p := newS3Pipeline(cfg) - go p.ProcessShard(shard.ShardId) - } - - // Keep alive - <-make(chan int) -} diff --git a/examples/s3-pipeline/pipeline.cfg b/examples/s3-pipeline/pipeline.cfg deleted file mode 100644 index 3108849..0000000 --- a/examples/s3-pipeline/pipeline.cfg +++ /dev/null @@ -1,8 +0,0 @@ -[pipeline] - name = s3Pipeline -[s3] - bucketName = kinesis.test -[kinesis] - bufferSize = 100 - shardCount = 2 - streamName = userStream diff --git a/examples/s3-pipeline/README.md b/examples/s3/README.md similarity index 92% rename from examples/s3-pipeline/README.md rename to examples/s3/README.md index eac7cea..d50150d 100644 --- a/examples/s3-pipeline/README.md +++ b/examples/s3/README.md @@ -22,4 +22,4 @@ export AWS_SECRET_KEY= ### Running the code - $ go run main.go + $ go run main.go -a appName -s streamName diff --git a/examples/s3/main.go b/examples/s3/main.go new file mode 100644 index 0000000..c091241 --- /dev/null +++ b/examples/s3/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "bytes" + "flag" + + "github.com/harlow/kinesis-connectors" +) + +var ( + app = flag.String("a", "", "App name") + bucket = flag.String("b", "", "Bucket name") + stream = flag.String("s", "", "Stream name") +) + +func handler(b connector.Buffer) { + body := new(bytes.Buffer) + + // filter or transform data if needed + for _, r := range b.GetRecords() { + body.Write(r.Data) + } + + s3 := &connector.S3Emitter{Bucket: *bucket} + s3.Emit( + connector.S3Key("", b.FirstSeq(), b.LastSeq()), + bytes.NewReader(body.Bytes()), + ) +} + +func main() { + flag.Parse() + + c := connector.NewConsumer(*app, *stream) + c.Start(connector.HandlerFunc(handler)) + + select {} // run forever +} diff --git a/examples/seed-stream/README.md b/examples/seed/README.md similarity index 83% rename from examples/seed-stream/README.md rename to examples/seed/README.md index f326f5b..1ac51b8 100644 --- a/examples/seed-stream/README.md +++ b/examples/seed/README.md @@ -16,5 +16,5 @@ export AWS_SECRET_KEY= ### Running the code - $ curl https://s3.amazonaws.com/kinesis.test/users.txt > users.txt + $ curl https://s3.amazonaws.com/kinesis.test/users.txt > /tmp/users.txt $ go run main.go diff --git a/examples/seed-stream/main.go b/examples/seed/main.go similarity index 59% rename from examples/seed-stream/main.go rename to examples/seed/main.go index 4e14e05..e6807d1 100644 --- a/examples/seed-stream/main.go +++ b/examples/seed/main.go @@ -2,6 +2,8 @@ package main import ( "bufio" + "flag" + "fmt" "log" "os" "sync" @@ -13,55 +15,56 @@ import ( // Note: download file with test data // curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt -func putToS3(svc *kinesis.Kinesis, data string) { +var stream = flag.String("s", "", "Stream name") + +func putToS3(svc *kinesis.Kinesis, data string, partitionKey string) { params := &kinesis.PutRecordInput{ Data: []byte(data), - PartitionKey: aws.String("partitionKey"), - StreamName: aws.String("hw-test-stream"), + PartitionKey: aws.String(partitionKey), + StreamName: aws.String(*stream), } _, err := svc.PutRecord(params) - if err != nil { - log.Fatal(err.Error()) + fmt.Println(err.Error()) return } else { - log.Print(".") + fmt.Print(".") } } func main() { - wg := &sync.WaitGroup{} + flag.Parse() + jobCh := make(chan string) - - // read sample data - file, err := os.Open("/tmp/users.txt") - - if err != nil { - log.Fatal("Cannot open users.txt file") - } - - defer file.Close() - scanner := bufio.NewScanner(file) - - // initialize kinesis client svc := kinesis.New(session.New()) + wg := &sync.WaitGroup{} + // boot the workers for processing data for i := 0; i < 4; i++ { wg.Add(1) go func() { for data := range jobCh { - putToS3(svc, data) + putToS3(svc, data, string(i)) } wg.Done() }() } - for scanner.Scan() { - data := scanner.Text() + // open data file + f, err := os.Open("/tmp/users.txt") + if err != nil { + log.Fatal("Cannot open users.txt file") + } + defer f.Close() + + // put sample data on channel + b := bufio.NewScanner(f) + for b.Scan() { + data := b.Text() jobCh <- data } - log.Println(".") + fmt.Println(".") log.Println("Finished populating stream") } diff --git a/filter.go b/filter.go deleted file mode 100644 index 53f9be2..0000000 --- a/filter.go +++ /dev/null @@ -1,7 +0,0 @@ -package connector - -// Filter is an interface used for determinint whether to buffer records. -// Returns false if you don't want to hold on to the record. -type Filter interface { - KeepRecord(r interface{}) bool -} diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..624907d --- /dev/null +++ b/handler.go @@ -0,0 +1,18 @@ +package connector + +type Handler interface { + HandleRecords(b Buffer) +} + +// HandlerFunc is a convenience type to avoid having to declare a struct +// to implement the Handler interface, it can be used like this: +// +// consumer.AddHandler(connector.HandlerFunc(func(b Buffer) { +// // ... +// })) +type HandlerFunc func(b Buffer) + +// HandleRecords implements the Handler interface +func (h HandlerFunc) HandleRecords(b Buffer) { + h(b) +} diff --git a/kinesis.go b/kinesis.go deleted file mode 100644 index 0505af3..0000000 --- a/kinesis.go +++ /dev/null @@ -1,54 +0,0 @@ -package connector - -import ( - "time" - - "github.com/sendgridlabs/go-kinesis" -) - -// CreateStream creates a new Kinesis stream (uses existing stream if exists) and -// waits for it to become available. -func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { - if !StreamExists(k, streamName) { - err := k.CreateStream(streamName, shardCount) - - if err != nil { - logger.Log("error", "CreateStream", "msg", err.Error()) - return - } - } - - resp := &kinesis.DescribeStreamResp{} - timeout := make(chan bool, 30) - - for { - args := kinesis.NewArgs() - args.Add("StreamName", streamName) - resp, _ = k.DescribeStream(args) - streamStatus := resp.StreamDescription.StreamStatus - logger.Log("info", "DescribeStream", "stream", streamName, "status", streamStatus) - - if streamStatus != "ACTIVE" { - time.Sleep(4 * time.Second) - timeout <- true - } else { - break - } - } -} - -// StreamExists checks if a Kinesis stream exists. -func StreamExists(k *kinesis.Kinesis, streamName string) bool { - args := kinesis.NewArgs() - resp, err := k.ListStreams(args) - if err != nil { - logger.Log("error", "ListStream", "stream", streamName, "status", err.Error()) - return false - } - for _, s := range resp.StreamNames { - if s == streamName { - return true - } - } - return false -} diff --git a/logger.go b/logger.go index eaa6737..9286db3 100644 --- a/logger.go +++ b/logger.go @@ -13,4 +13,4 @@ func SetLogger(l log.Logger) { } // specify a default logger so that we don't end up with panics. -var logger log.Logger = log.NewPrefixLogger(os.Stderr) +var logger log.Logger = log.NewLogfmtLogger(os.Stderr) diff --git a/pipeline.go b/pipeline.go deleted file mode 100644 index 3f6a65f..0000000 --- a/pipeline.go +++ /dev/null @@ -1,112 +0,0 @@ -package connector - -import ( - "os" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/service/kinesis" -) - -// Pipeline is used as a record processor to configure a pipline. -// -// The user should implement this such that each method returns a configured implementation of each -// interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model. -// Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter. -type Pipeline struct { - Buffer Buffer - Checkpoint Checkpoint - Emitter Emitter - Filter Filter - Kinesis *kinesis.Kinesis - StreamName string - Transformer Transformer - - checkpointSequenceNumber string -} - -// ProcessShard is a long running process that handles reading records from a Kinesis shard. -func (p Pipeline) ProcessShard(shardID string) { - svc := kinesis.New(&aws.Config{Region: "us-east-1"}) - - args := &kinesis.GetShardIteratorInput{ - ShardID: aws.String(shardID), - StreamName: aws.String(p.StreamName), - } - - if p.Checkpoint.CheckpointExists(shardID) { - args.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") - args.StartingSequenceNumber = aws.String(p.Checkpoint.SequenceNumber()) - } else { - args.ShardIteratorType = aws.String("TRIM_HORIZON") - } - - resp, err := svc.GetShardIterator(args) - - if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - logger.Log("fatal", "getShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr()) - os.Exit(1) - } - } - - errorCount := 0 - shardIterator := resp.ShardIterator - - for { - // exit program if error threshold is reached - if errorCount > 50 { - logger.Log("fatal", "getRecords", "msg", "Too many consecutive error attempts") - os.Exit(1) - } - - // get records from stream - args := &kinesis.GetRecordsInput{ShardIterator: shardIterator} - resp, err := svc.GetRecords(args) - - // handle recoverable errors, else exit program - if err != nil { - awsErr, _ := err.(awserr.Error) - errorCount++ - - if isRecoverableError(err) { - logger.Log("warn", "getRecords", "errorCount", errorCount, "code", awsErr.Code()) - handleAwsWaitTimeExp(errorCount) - continue - } else { - logger.Log("fatal", "getRecords", awsErr.Code()) - os.Exit(1) - } - } else { - errorCount = 0 - } - - // process records - if len(resp.Records) > 0 { - for _, r := range resp.Records { - transformedRecord := p.Transformer.ToRecord(r.Data) - - if p.Filter.KeepRecord(transformedRecord) { - p.Buffer.ProcessRecord(transformedRecord, *r.SequenceNumber) - } - - p.checkpointSequenceNumber = *r.SequenceNumber - } - - if p.Buffer.ShouldFlush() { - p.Emitter.Emit(p.Buffer, p.Transformer) - logger.Log("info", "emit", "shardID", shardID, "recordsEmitted", len(p.Buffer.Records())) - p.Checkpoint.SetCheckpoint(shardID, p.checkpointSequenceNumber) - p.Buffer.Flush() - } - } else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator { - logger.Log("fatal", "nextShardIterator", "msg", err.Error()) - os.Exit(1) - } else { - time.Sleep(1 * time.Second) - } - - shardIterator = resp.NextShardIterator - } -} diff --git a/record_buffer.go b/record_buffer.go deleted file mode 100644 index 012e1ed..0000000 --- a/record_buffer.go +++ /dev/null @@ -1,51 +0,0 @@ -package connector - -// RecordBuffer is a basic implementation of the Buffer interface. -// It buffer's records and answers questions on when it should be periodically flushed. -type RecordBuffer struct { - NumRecordsToBuffer int - - firstSequenceNumber string - lastSequenceNumber string - recordsInBuffer []interface{} -} - -// ProcessRecord adds a message to the buffer. -func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) { - if b.NumRecordsInBuffer() == 0 { - b.firstSequenceNumber = sequenceNumber - } - - b.lastSequenceNumber = sequenceNumber - b.recordsInBuffer = append(b.recordsInBuffer, record) -} - -// Records returns the records in the buffer. -func (b *RecordBuffer) Records() []interface{} { - return b.recordsInBuffer -} - -// NumRecordsInBuffer returns the number of messages in the buffer. -func (b RecordBuffer) NumRecordsInBuffer() int { - return len(b.recordsInBuffer) -} - -// Flush empties the buffer and resets the sequence counter. -func (b *RecordBuffer) Flush() { - b.recordsInBuffer = b.recordsInBuffer[:0] -} - -// ShouldFlush determines if the buffer has reached its target size. -func (b *RecordBuffer) ShouldFlush() bool { - return len(b.recordsInBuffer) >= b.NumRecordsToBuffer -} - -// FirstSequenceNumber returns the sequence number of the first message in the buffer. -func (b *RecordBuffer) FirstSequenceNumber() string { - return b.firstSequenceNumber -} - -// LastSequenceNumber returns the sequence number of the last message in the buffer. -func (b *RecordBuffer) LastSequenceNumber() string { - return b.lastSequenceNumber -} diff --git a/record_buffer_test.go b/record_buffer_test.go deleted file mode 100644 index 74c0db7..0000000 --- a/record_buffer_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package connector - -import "testing" - -type TestRecord struct{} - -func (r TestRecord) ToDelimitedString() string { - return "test" -} - -func (r TestRecord) ToJSON() []byte { - return []byte("test") -} - -func TestProcessRecord(t *testing.T) { - var r1, s1 = TestRecord{}, "Seq1" - var r2, s2 = TestRecord{}, "Seq2" - - b := RecordBuffer{} - b.ProcessRecord(r1, s1) - - if b.NumRecordsInBuffer() != 1 { - t.Errorf("NumRecordsInBuffer() want %v", 1) - } - - b.ProcessRecord(r2, s2) - - if b.NumRecordsInBuffer() != 2 { - t.Errorf("NumRecordsInBuffer() want %v", 2) - } -} - -func TestFlush(t *testing.T) { - var r1, s1 = TestRecord{}, "SeqNum" - b := RecordBuffer{} - b.ProcessRecord(r1, s1) - - b.Flush() - - if b.NumRecordsInBuffer() != 0 { - t.Errorf("Count() want %v", 0) - } -} - -func TestLastSequenceNumber(t *testing.T) { - var r1, s1 = TestRecord{}, "Seq1" - var r2, s2 = TestRecord{}, "Seq2" - - b := RecordBuffer{} - b.ProcessRecord(r1, s1) - - if b.LastSequenceNumber() != s1 { - t.Errorf("LastSequenceNumber() want %v", s1) - } - - b.ProcessRecord(r2, s2) - - if b.LastSequenceNumber() != s2 { - t.Errorf("LastSequenceNumber() want %v", s2) - } -} - -func TestFirstSequenceNumber(t *testing.T) { - var r1, s1 = TestRecord{}, "Seq1" - var r2, s2 = TestRecord{}, "Seq2" - - b := RecordBuffer{} - b.ProcessRecord(r1, s1) - - if b.FirstSequenceNumber() != s1 { - t.Errorf("FirstSequenceNumber() want %v", s1) - } - - b.ProcessRecord(r2, s2) - - if b.FirstSequenceNumber() != s1 { - t.Errorf("FirstSequenceNumber() want %v", s1) - } -} - -func TestShouldFlush(t *testing.T) { - const n = 2 - var r1, s1 = TestRecord{}, "Seq1" - var r2, s2 = TestRecord{}, "Seq2" - - b := RecordBuffer{NumRecordsToBuffer: n} - b.ProcessRecord(r1, s1) - - if b.ShouldFlush() != false { - t.Errorf("ShouldFlush() want %v", false) - } - - b.ProcessRecord(r2, s2) - - if b.ShouldFlush() != true { - t.Errorf("ShouldFlush() want %v", true) - } -} diff --git a/redis_checkpoint.go b/redis_checkpoint.go deleted file mode 100644 index 2bf0271..0000000 --- a/redis_checkpoint.go +++ /dev/null @@ -1,48 +0,0 @@ -package connector - -import ( - "fmt" - - "github.com/hoisie/redis" -) - -// RedisCheckpoint implements the Checkpont interface. -// This class is used to enable the Pipeline.ProcessShard to checkpoint their progress. -type RedisCheckpoint struct { - AppName string - StreamName string - - client redis.Client - sequenceNumber string -} - -// CheckpointExists determines if a checkpoint for a particular Shard exists. -// Typically used to determine whether we should start processing the shard with -// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *RedisCheckpoint) CheckpointExists(shardID string) bool { - val, _ := c.client.Get(c.key(shardID)) - - if val != nil && string(val) != "" { - c.sequenceNumber = string(val) - return true - } - - return false -} - -// SequenceNumber returns the current checkpoint stored for the specified shard. -func (c *RedisCheckpoint) SequenceNumber() string { - return c.sequenceNumber -} - -// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). -// Upon failover, record processing is resumed from this point. -func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { - c.client.Set(c.key(shardID), []byte(sequenceNumber)) - c.sequenceNumber = sequenceNumber -} - -// key generates a unique Redis key for storage of Checkpoint. -func (c *RedisCheckpoint) key(shardID string) string { - return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) -} diff --git a/redis_checkpoint_test.go b/redis_checkpoint_test.go deleted file mode 100644 index 860a36d..0000000 --- a/redis_checkpoint_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package connector - -import ( - "testing" - - "github.com/hoisie/redis" -) - -func TestKey(t *testing.T) { - k := "app:checkpoint:stream:shard" - c := RedisCheckpoint{AppName: "app", StreamName: "stream"} - - r := c.key("shard") - - if r != k { - t.Errorf("key() = %v, want %v", k, r) - } -} - -func TestCheckpointExists(t *testing.T) { - var rc redis.Client - k := "app:checkpoint:stream:shard" - rc.Set(k, []byte("fakeSeqNum")) - c := RedisCheckpoint{AppName: "app", StreamName: "stream"} - - r := c.CheckpointExists("shard") - - if r != true { - t.Errorf("CheckpointExists() = %v, want %v", false, r) - } - - rc.Del(k) -} - -func TestSetCheckpoint(t *testing.T) { - k := "app:checkpoint:stream:shard" - var rc redis.Client - c := RedisCheckpoint{AppName: "app", StreamName: "stream"} - c.SetCheckpoint("shard", "fakeSeqNum") - - r, _ := rc.Get(k) - - if string(r) != "fakeSeqNum" { - t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", r) - } - - rc.Del(k) -} diff --git a/redshift_emitter.go b/redshift_emitter.go new file mode 100644 index 0000000..3db74d9 --- /dev/null +++ b/redshift_emitter.go @@ -0,0 +1,74 @@ +package connector + +import ( + "bytes" + "database/sql" + "fmt" + "io" + + // Postgres package is used when sql.Open is called + _ "github.com/lib/pq" +) + +// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one. +// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered +// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct. +type RedshiftEmitter struct { + AwsAccessKey string + AwsSecretAccessKey string + Delimiter string + Format string + Jsonpaths string + S3Bucket string + S3Prefix string + TableName string + Db *sql.DB +} + +// Emit is invoked when the buffer is full. This method leverages the S3Emitter and +// then issues a copy command to Redshift data store. +func (e RedshiftEmitter) Emit(s3Key string, b io.ReadSeeker) { + // put contents to S3 Bucket + s3 := &S3Emitter{Bucket: e.S3Bucket} + s3.Emit(s3Key, b) + + for i := 0; i < 10; i++ { + // execute copy statement + _, err := e.Db.Exec(e.copyStatement(s3Key)) + + // db command succeeded, break from loop + if err == nil { + logger.Log("info", "RedshiftEmitter", "file", s3Key) + break + } + + // handle recoverable errors, else break from loop + if isRecoverableError(err) { + handleAwsWaitTimeExp(i) + } else { + logger.Log("error", "RedshiftEmitter", "msg", err.Error()) + break + } + } +} + +// Creates the SQL copy statement issued to Redshift cluster. +func (e RedshiftEmitter) copyStatement(s3Key string) string { + b := new(bytes.Buffer) + b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) + b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3Key)) + b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", e.AwsAccessKey)) + b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", e.AwsSecretAccessKey)) + + switch e.Format { + case "json": + b.WriteString(fmt.Sprintf("json 'auto'")) + case "jsonpaths": + b.WriteString(fmt.Sprintf("json '%v'", e.Jsonpaths)) + default: + b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) + } + b.WriteString(";") + + return b.String() +} diff --git a/redshift_basic_emitter_test.go b/redshift_emitter_test.go similarity index 85% rename from redshift_basic_emitter_test.go rename to redshift_emitter_test.go index 4689be6..52f33ef 100644 --- a/redshift_basic_emitter_test.go +++ b/redshift_emitter_test.go @@ -4,8 +4,8 @@ import ( "testing" ) -func TestCopyStatement(t *testing.T) { - e := RedshiftBasicEmitter{ +func Test_CopyStatement(t *testing.T) { + e := RedshiftEmitter{ Delimiter: ",", S3Bucket: "test_bucket", TableName: "test_table", diff --git a/s3_emitter.go b/s3_emitter.go index 858c8c7..57cb419 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -1,43 +1,33 @@ package connector import ( - "bytes" - "fmt" - "time" + "io" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "gopkg.in/matryer/try.v1" ) -// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3. +// S3Emitter stores data in S3 bucket. // // The use of this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this // struct's Emit method adds the contents of the buffer to S3 as one file. The filename is generated // from the first and last sequence numbers of the records contained in that file separated by a // dash. This struct requires the configuration of an S3 bucket and endpoint. type S3Emitter struct { - S3Bucket string - S3Prefix string + Bucket string } // Emit is invoked when the buffer is full. This method emits the set of filtered records. -func (e S3Emitter) Emit(b Buffer, t Transformer) { - var buffer bytes.Buffer - svc := s3.New(&aws.Config{Region: "us-east-1"}) - key := e.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) - - for _, r := range b.Records() { - var s = t.FromRecord(r) - buffer.Write(s) - } - +func (e S3Emitter) Emit(s3Key string, b io.ReadSeeker) { + svc := s3.New(session.New()) params := &s3.PutObjectInput{ - Body: bytes.NewReader(buffer.Bytes()), - Bucket: aws.String(e.S3Bucket), + Body: b, + Bucket: aws.String(e.Bucket), ContentType: aws.String("text/plain"), - Key: aws.String(key), + Key: aws.String(s3Key), } err := try.Do(func(attempt int) (bool, error) { @@ -48,18 +38,9 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) { if err != nil { if awsErr, ok := err.(awserr.Error); ok { - logger.Log("error", "emit", "code", awsErr.Code()) + logger.Log("error", "s3.PutObject", "code", awsErr.Code()) } } -} -// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current -// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches. -func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { - date := time.Now().UTC().Format("2006/01/02") - if e.S3Prefix == "" { - return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) - } else { - return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq) - } + logger.Log("info", "S3Emitter", "msg", "success", "key", s3Key) } diff --git a/s3_emitter_test.go b/s3_emitter_test.go deleted file mode 100644 index 8e9229e..0000000 --- a/s3_emitter_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package connector - -import ( - "fmt" - "testing" - "time" -) - -func TestS3FileName(t *testing.T) { - d := time.Now().UTC().Format("2006/01/02") - e := S3Emitter{S3Bucket: "bucket", S3Prefix: "prefix"} - - expected := fmt.Sprintf("prefix/%v/a-b", d) - result := e.S3FileName("a", "b") - - if result != expected { - t.Errorf("S3FileName() = %v want %v", result, expected) - } - - e.S3Prefix = "" - expected = fmt.Sprintf("%v/a-b", d) - result = e.S3FileName("a", "b") - - if result != expected { - t.Errorf("S3FileName() = %v want %v", result, expected) - } -} diff --git a/s3_key.go b/s3_key.go new file mode 100644 index 0000000..64529f9 --- /dev/null +++ b/s3_key.go @@ -0,0 +1,16 @@ +package connector + +import ( + "fmt" + "time" +) + +func S3Key(prefix, firstSeq, lastSeq string) string { + date := time.Now().UTC().Format("2006/01/02") + + if prefix == "" { + return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) + } else { + return fmt.Sprintf("%v/%v/%v-%v", prefix, date, firstSeq, lastSeq) + } +} diff --git a/s3_key_test.go b/s3_key_test.go new file mode 100644 index 0000000..976b8b0 --- /dev/null +++ b/s3_key_test.go @@ -0,0 +1,19 @@ +package connector + +import ( + "fmt" + "testing" + "time" + + "github.com/bmizerany/assert" +) + +func Test_S3Key(t *testing.T) { + d := time.Now().UTC().Format("2006/01/02") + + k := S3Key("", "a", "b") + assert.Equal(t, k, fmt.Sprintf("%v/a-b", d)) + + k = S3Key("prefix", "a", "b") + assert.Equal(t, k, fmt.Sprintf("prefix/%v/a-b", d)) +} diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 9977dcb..cf9fbe5 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -1,38 +1,41 @@ package connector import ( + "io" "os" - "github.com/sendgridlabs/go-kinesis" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" ) // An implementation of Emitter that puts event data on S3 file, and then puts the // S3 file path onto the output stream for processing by manifest application. type S3ManifestEmitter struct { OutputStream string - S3Bucket string - Ksis *kinesis.Kinesis + Bucket string + Prefix string } -func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { +func (e S3ManifestEmitter) Emit(s3Key string, b io.ReadSeeker) { + // put contents to S3 Bucket + s3 := &S3Emitter{Bucket: e.Bucket} + s3.Emit(s3Key, b) - // Emit buffer contents to S3 Bucket - s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} - s3Emitter.Emit(b, t) - s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) + // put file path on Kinesis output stream + params := &kinesis.PutRecordInput{ + Data: []byte(s3Key), + PartitionKey: aws.String(s3Key), + StreamName: aws.String(e.OutputStream), + } - // Emit the file path to Kinesis Output stream - args := kinesis.NewArgs() - args.Add("StreamName", e.OutputStream) - args.Add("PartitionKey", s3File) - args.AddData([]byte(s3File)) - - _, err := e.Ksis.PutRecord(args) + svc := kinesis.New(session.New()) + _, err := svc.PutRecord(params) if err != nil { logger.Log("error", "PutRecord", "msg", err) os.Exit(1) } else { - logger.Log("info", "S3ManifestEmitter", "firstSequenceNumber", b.FirstSequenceNumber(), "stream", e.OutputStream) + logger.Log("info", "S3ManifestEmitter", "stream", e.OutputStream, "key", s3Key) } } diff --git a/string_to_string_transformer.go b/string_to_string_transformer.go deleted file mode 100644 index bc67544..0000000 --- a/string_to_string_transformer.go +++ /dev/null @@ -1,14 +0,0 @@ -package connector - -// StringToStringTransformer an implemenation of Transformer interface. -type StringToStringTransformer struct{} - -// ToRecord takes a byte array and returns a string. -func (t StringToStringTransformer) ToRecord(data []byte) interface{} { - return string(data) -} - -// FromRecord takes an string and returns a byte array. -func (t StringToStringTransformer) FromRecord(s interface{}) []byte { - return []byte(s.(string)) -} diff --git a/transformer.go b/transformer.go deleted file mode 100644 index 9aadca6..0000000 --- a/transformer.go +++ /dev/null @@ -1,8 +0,0 @@ -package connector - -// Transformer is used to transform data (byte array) to a Record for -// processing in the application. -type Transformer interface { - FromRecord(r interface{}) []byte - ToRecord(data []byte) interface{} -}