From 70c3b1bd7972caae4a437f4e0964e0b1c27e1b3c Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Tue, 29 Jul 2014 19:15:44 -0700 Subject: [PATCH] Broke apart generic files into directories * Added new package name for each directory. * Update tests to match new package names. --- buffers/buffer.go | 8 ++++++++ buffers.go => buffers/msg_buffer.go | 9 +-------- buffers_test.go => buffers/msg_buffer_test.go | 2 +- checkpoints/checkpoint.go | 8 ++++++++ .../redis_checkpoint.go | 8 +------- .../redis_checkpoint_test.go | 5 +++-- emitters/emitter.go | 5 +++++ emitters/redshift_emitter.go | 16 +++++++++++++++ emitters.go => emitters/s3_emitter.go | 20 +++---------------- .../s3_emitter_test.go | 4 ++-- examples/consumer.go | 12 ++++++----- examples/producer.go | 5 +++-- utils.go => utils/kinesis_utils.go | 5 +++-- 13 files changed, 61 insertions(+), 46 deletions(-) create mode 100644 buffers/buffer.go rename buffers.go => buffers/msg_buffer.go (90%) rename buffers_test.go => buffers/msg_buffer_test.go (99%) create mode 100644 checkpoints/checkpoint.go rename checkpoints.go => checkpoints/redis_checkpoint.go (81%) rename checkpoints_test.go => checkpoints/redis_checkpoint_test.go (97%) create mode 100644 emitters/emitter.go create mode 100644 emitters/redshift_emitter.go rename emitters.go => emitters/s3_emitter.go (61%) rename emitters_test.go => emitters/s3_emitter_test.go (81%) rename utils.go => utils/kinesis_utils.go (98%) diff --git a/buffers/buffer.go b/buffers/buffer.go new file mode 100644 index 0000000..e8852e0 --- /dev/null +++ b/buffers/buffer.go @@ -0,0 +1,8 @@ +package buffers + +type Buffer interface { + Data() []byte + FirstSequenceNumber() string + LastSequenceNumber() string + NumMessagesInBuffer() int +} diff --git a/buffers.go b/buffers/msg_buffer.go similarity index 90% rename from buffers.go rename to buffers/msg_buffer.go index 3f1ddc3..cf57c94 100644 --- a/buffers.go +++ b/buffers/msg_buffer.go @@ -1,14 +1,7 @@ -package etl +package buffers import "bytes" -type Buffer interface { - Data() []byte - FirstSequenceNumber() string - LastSequenceNumber() string - NumMessagesInBuffer() int -} - type MsgBuffer struct { buffer bytes.Buffer firstSequenceNumber string diff --git a/buffers_test.go b/buffers/msg_buffer_test.go similarity index 99% rename from buffers_test.go rename to buffers/msg_buffer_test.go index 8b32337..316cd28 100644 --- a/buffers_test.go +++ b/buffers/msg_buffer_test.go @@ -1,4 +1,4 @@ -package etl +package buffers import ( "bytes" diff --git a/checkpoints/checkpoint.go b/checkpoints/checkpoint.go new file mode 100644 index 0000000..1945358 --- /dev/null +++ b/checkpoints/checkpoint.go @@ -0,0 +1,8 @@ +package checkpoints + +type Checkpoint interface { + CheckpointExists(streamName string, shardID string) bool + SequenceNumber() string + SetCheckpoint(streamName string, shardID string, sequenceNumber string) +} + diff --git a/checkpoints.go b/checkpoints/redis_checkpoint.go similarity index 81% rename from checkpoints.go rename to checkpoints/redis_checkpoint.go index 17de850..1043c4e 100644 --- a/checkpoints.go +++ b/checkpoints/redis_checkpoint.go @@ -1,4 +1,4 @@ -package etl +package checkpoints import ( "fmt" @@ -6,12 +6,6 @@ import ( "github.com/hoisie/redis" ) -type Checkpoint interface { - CheckpointExists(streamName string, shardID string) bool - SequenceNumber() string - SetCheckpoint(streamName string, shardID string, sequenceNumber string) -} - type RedisCheckpoint struct { AppName string client redis.Client diff --git a/checkpoints_test.go b/checkpoints/redis_checkpoint_test.go similarity index 97% rename from checkpoints_test.go rename to checkpoints/redis_checkpoint_test.go index 46b9fea..c00fe95 100644 --- a/checkpoints_test.go +++ b/checkpoints/redis_checkpoint_test.go @@ -1,8 +1,9 @@ -package etl +package checkpoints import ( - "github.com/hoisie/redis" "testing" + + "github.com/hoisie/redis" ) func TestKeyGen(t *testing.T) { diff --git a/emitters/emitter.go b/emitters/emitter.go new file mode 100644 index 0000000..c0bf91a --- /dev/null +++ b/emitters/emitter.go @@ -0,0 +1,5 @@ +package emitters + +type Emitter interface { + Emit(path string, data []byte) +} diff --git a/emitters/redshift_emitter.go b/emitters/redshift_emitter.go new file mode 100644 index 0000000..2ae4641 --- /dev/null +++ b/emitters/redshift_emitter.go @@ -0,0 +1,16 @@ +package emitters + +import ( + "fmt" +) + +type RedshiftEmitter struct { +} + +func (e RedshiftEmitter) Emit(path string, data []byte) { + // first call S3 bucket + // pg.query("COPY file_path TO table_name") + // pg.query("INSERT INTO imported_files VALUE file_path") + fmt.Printf("debug: emitting %v to Redshift\n", path) + fmt.Println(string(data)) +} diff --git a/emitters.go b/emitters/s3_emitter.go similarity index 61% rename from emitters.go rename to emitters/s3_emitter.go index 54efdf9..d66c810 100644 --- a/emitters.go +++ b/emitters/s3_emitter.go @@ -1,4 +1,4 @@ -package etl +package emitters import ( "fmt" @@ -6,12 +6,9 @@ import ( "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" + "github.com/harlow/go-etl/buffers" ) -type Emitter interface { - Emit(path string, data []byte) -} - type S3Emitter struct { S3Bucket string } @@ -21,7 +18,7 @@ func (e S3Emitter) s3FileName(firstSeq string, lastSeq string) string { return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq) } -func (e S3Emitter) Emit(buffer Buffer) { +func (e S3Emitter) Emit(buffer buffers.Buffer) { auth, _ := aws.EnvAuth() s := s3.New(auth, aws.USEast) b := s.Bucket(e.S3Bucket) @@ -30,14 +27,3 @@ func (e S3Emitter) Emit(buffer Buffer) { fmt.Printf("Successfully emitted %v records to S3 in s3://%v/%v", buffer.NumMessagesInBuffer(), b, f) fmt.Println(r) } - -type RedshiftEmitter struct { -} - -func (e RedshiftEmitter) Emit(path string, data []byte) { - // first call S3 bucket - // pg.query("COPY file_path TO table_name") - // pg.query("INSERT INTO imported_files VALUE file_path") - fmt.Printf("debug: emitting %v to Redshift\n", path) - fmt.Println(string(data)) -} diff --git a/emitters_test.go b/emitters/s3_emitter_test.go similarity index 81% rename from emitters_test.go rename to emitters/s3_emitter_test.go index 60977c5..881167c 100644 --- a/emitters_test.go +++ b/emitters/s3_emitter_test.go @@ -1,4 +1,4 @@ -package etl +package emitters import ( "fmt" @@ -6,7 +6,7 @@ import ( "time" ) -func TestPath(t *testing.T) { +func TestS3FileName(t *testing.T) { d := time.Now().UTC().Format("2006-01-02") n := fmt.Sprintf("/%v/a-b.txt", d) e := S3Emitter{} diff --git a/examples/consumer.go b/examples/consumer.go index aa3727f..2252fa1 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -3,7 +3,9 @@ package main import ( "fmt" - "github.com/harlow/go-etl" + "github.com/harlow/go-etl/checkpoints" + "github.com/harlow/go-etl/emitters" + "github.com/harlow/go-etl/utils" "github.com/joho/godotenv" "github.com/sendgridlabs/go-kinesis" ) @@ -13,9 +15,9 @@ func main() { s := "inputStream" k := kinesis.New("", "", kinesis.Region{}) - c := etl.RedisCheckpoint{AppName: "sampleApp"} - e := etl.S3Emitter{S3Bucket: "bucketName"} - // t := etl.EventTransformer{} + c := checkpoints.RedisCheckpoint{AppName: "sampleApp"} + e := emitters.S3Emitter{S3Bucket: "bucketName"} + // t := transformers.EventTransformer{} args := kinesis.NewArgs() args.Add("StreamName", s) @@ -27,7 +29,7 @@ func main() { } for _, shard := range streamInfo.StreamDescription.Shards { - go etl.GetRecords(k, &c, e, s, shard.ShardId) + go utils.GetRecords(k, &c, e, s, shard.ShardId) } select {} diff --git a/examples/producer.go b/examples/producer.go index 391fb05..7793b61 100644 --- a/examples/producer.go +++ b/examples/producer.go @@ -2,7 +2,8 @@ package main import ( "fmt" - "github.com/harlow/go-etl" + + "github.com/harlow/go-etl/utils" "github.com/joho/godotenv" "github.com/sendgridlabs/go-kinesis" ) @@ -28,7 +29,7 @@ func main() { streamName := "inputStream" ksis := kinesis.New("", "", kinesis.Region{}) - etl.CreateAndWaitForStreamToBecomeAvailable(ksis, streamName, 2) + utils.CreateAndWaitForStreamToBecomeAvailable(ksis, streamName, 2) putSampleDataOnStream(ksis, streamName, 50) // deleteStream(ksis, streamName) } diff --git a/utils.go b/utils/kinesis_utils.go similarity index 98% rename from utils.go rename to utils/kinesis_utils.go index 3b33750..7b5bd76 100644 --- a/utils.go +++ b/utils/kinesis_utils.go @@ -1,9 +1,10 @@ -package etl +package utils import ( "fmt" - "github.com/sendgridlabs/go-kinesis" "time" + + "github.com/sendgridlabs/go-kinesis" ) func CreateAndWaitForStreamToBecomeAvailable(ksis *kinesis.Kinesis, streamName string, shardCount int) {