diff --git a/buffers.go b/buffers.go index 99809a3..3f1ddc3 100644 --- a/buffers.go +++ b/buffers.go @@ -1,10 +1,13 @@ package etl -import ( - "bytes" - "fmt" - "time" -) +import "bytes" + +type Buffer interface { + Data() []byte + FirstSequenceNumber() string + LastSequenceNumber() string + NumMessagesInBuffer() int +} type MsgBuffer struct { buffer bytes.Buffer @@ -40,11 +43,6 @@ func (b MsgBuffer) SequenceExists(sequenceNumber string) bool { return false } -func (b MsgBuffer) FileName() string { - date := time.Now().UTC().Format("2006-01-02") - return fmt.Sprintf("/%v/%v-%v.txt", date, b.firstSequenceNumber, b.lastSequenceNumber) -} - func (b MsgBuffer) Data() []byte { return b.buffer.Bytes() } diff --git a/buffers_test.go b/buffers_test.go index 856a21f..8b32337 100644 --- a/buffers_test.go +++ b/buffers_test.go @@ -2,9 +2,7 @@ package etl import ( "bytes" - "fmt" "testing" - "time" ) func TestNumMessagesToBuffer(t *testing.T) { @@ -139,18 +137,3 @@ func TestData(t *testing.T) { t.Errorf("Data() want %v", body) } } - -func TestFileName(t *testing.T) { - var r1, s1 = []byte("Record1"), "Seq1" - var r2, s2 = []byte("Record2"), "Seq2" - date := time.Now().UTC().Format("2006-01-02") - name := fmt.Sprintf("/%v/Seq1-Seq2.txt", date) - - b := MsgBuffer{} - b.ConsumeRecord(r1, s1) - b.ConsumeRecord(r2, s2) - - if b.FileName() != name { - t.Errorf("FileName() = want %v", b.FileName(), name) - } -} diff --git a/checkpoints.go b/checkpoints.go index e9de6fc..17de850 100644 --- a/checkpoints.go +++ b/checkpoints.go @@ -13,7 +13,7 @@ type Checkpoint interface { } type RedisCheckpoint struct { - appName string + AppName string client redis.Client sequenceNumber string } @@ -41,5 +41,5 @@ func (c *RedisCheckpoint) SetCheckpoint(streamName string, shardID string, seque } func (c RedisCheckpoint) keyGen(streamName string, shardID string) string { - return fmt.Sprintf("%v:checkpoint:%v:%v", c.appName, streamName, shardID) + return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, streamName, shardID) } diff --git a/checkpoints_test.go b/checkpoints_test.go index 134d7d3..46b9fea 100644 --- a/checkpoints_test.go +++ b/checkpoints_test.go @@ -7,7 +7,7 @@ import ( func TestKeyGen(t *testing.T) { k := "app:checkpoint:stream:shard" - c := RedisCheckpoint{appName: "app"} + c := RedisCheckpoint{AppName: "app"} r := c.keyGen("stream", "shard") @@ -20,7 +20,7 @@ func TestCheckpointExists(t *testing.T) { var rc redis.Client k := "app:checkpoint:stream:shard" rc.Set(k, []byte("fakeSeqNum")) - c := RedisCheckpoint{appName: "app"} + c := RedisCheckpoint{AppName: "app"} r := c.CheckpointExists("stream", "shard") @@ -34,7 +34,7 @@ func TestCheckpointExists(t *testing.T) { func TestSetCheckpoint(t *testing.T) { k := "app:checkpoint:stream:shard" var rc redis.Client - c := RedisCheckpoint{appName: "app"} + c := RedisCheckpoint{AppName: "app"} c.SetCheckpoint("stream", "shard", "fakeSeqNum") r, _ := rc.Get(k) diff --git a/emitters.go b/emitters.go new file mode 100644 index 0000000..54efdf9 --- /dev/null +++ b/emitters.go @@ -0,0 +1,43 @@ +package etl + +import ( + "fmt" + "time" + + "github.com/crowdmob/goamz/aws" + "github.com/crowdmob/goamz/s3" +) + +type Emitter interface { + Emit(path string, data []byte) +} + +type S3Emitter struct { + S3Bucket string +} + +func (e S3Emitter) s3FileName(firstSeq string, lastSeq string) string { + date := time.Now().UTC().Format("2006-01-02") + return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq) +} + +func (e S3Emitter) Emit(buffer Buffer) { + auth, _ := aws.EnvAuth() + s := s3.New(auth, aws.USEast) + b := s.Bucket(e.S3Bucket) + f := e.s3FileName(buffer.FirstSequenceNumber(), buffer.LastSequenceNumber()) + r := b.Put(f, buffer.Data(), "text/plain", s3.Private, s3.Options{}) + fmt.Printf("Successfully emitted %v records to S3 in s3://%v/%v", buffer.NumMessagesInBuffer(), b, f) + fmt.Println(r) +} + +type RedshiftEmitter struct { +} + +func (e RedshiftEmitter) Emit(path string, data []byte) { + // first call S3 bucket + // pg.query("COPY file_path TO table_name") + // pg.query("INSERT INTO imported_files VALUE file_path") + fmt.Printf("debug: emitting %v to Redshift\n", path) + fmt.Println(string(data)) +} diff --git a/emitters_test.go b/emitters_test.go new file mode 100644 index 0000000..60977c5 --- /dev/null +++ b/emitters_test.go @@ -0,0 +1,18 @@ +package etl + +import ( + "fmt" + "testing" + "time" +) + +func TestPath(t *testing.T) { + d := time.Now().UTC().Format("2006-01-02") + n := fmt.Sprintf("/%v/a-b.txt", d) + e := S3Emitter{} + f := e.s3FileName("a", "b") + + if f != n { + t.Errorf("s3FileName() = want %v", f, n) + } +} diff --git a/examples/consumer.go b/examples/consumer.go index 9b84811..aa3727f 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/harlow/go-etl" "github.com/joho/godotenv" "github.com/sendgridlabs/go-kinesis" @@ -10,14 +11,10 @@ import ( func main() { godotenv.Load() - k := kinesis.New("", "", kinesis.Region{}) s := "inputStream" - - c := etl.RedisCheckpoint{appName: "sampleApp"} - - e := etl.S3Emitter{} - e.SetBucketName("bucketName") - + k := kinesis.New("", "", kinesis.Region{}) + c := etl.RedisCheckpoint{AppName: "sampleApp"} + e := etl.S3Emitter{S3Bucket: "bucketName"} // t := etl.EventTransformer{} args := kinesis.NewArgs() diff --git a/utils.go b/utils.go index b293d81..3b33750 100644 --- a/utils.go +++ b/utils.go @@ -1,54 +1,58 @@ package etl -import( +import ( "fmt" + "github.com/sendgridlabs/go-kinesis" "time" - "github.com/sendgridlabs/go-kinesis" ) func CreateAndWaitForStreamToBecomeAvailable(ksis *kinesis.Kinesis, streamName string, shardCount int) { - if !StreamExists(ksis, streamName) { - err := ksis.CreateStream(streamName, shardCount) + if !StreamExists(ksis, streamName) { + err := ksis.CreateStream(streamName, shardCount) - if err != nil { - fmt.Printf("CreateStream ERROR: %v\n", err) - return - } - } + if err != nil { + fmt.Printf("CreateStream ERROR: %v\n", err) + return + } + } - resp := &kinesis.DescribeStreamResp{} - timeout := make(chan bool, 30) + resp := &kinesis.DescribeStreamResp{} + timeout := make(chan bool, 30) - for { - args := kinesis.NewArgs() - args.Add("StreamName", streamName) - resp, _ = ksis.DescribeStream(args) - streamStatus := resp.StreamDescription.StreamStatus - fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus) + for { + args := kinesis.NewArgs() + args.Add("StreamName", streamName) + resp, _ = ksis.DescribeStream(args) + streamStatus := resp.StreamDescription.StreamStatus + fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus) - if streamStatus != "ACTIVE" { - time.Sleep(4 * time.Second) - timeout <- true - } else { - break - } - } + if streamStatus != "ACTIVE" { + time.Sleep(4 * time.Second) + timeout <- true + } else { + break + } + } } func StreamExists(ksis *kinesis.Kinesis, streamName string) bool { - args := kinesis.NewArgs() - resp, _ := ksis.ListStreams(args) - for _, name := range resp.StreamNames { if name == streamName { return true } } - return false + args := kinesis.NewArgs() + resp, _ := ksis.ListStreams(args) + for _, name := range resp.StreamNames { + if name == streamName { + return true + } + } + return false } func DeleteStream(ksis *kinesis.Kinesis, streamName string) { - err := ksis.DeleteStream("test") + err := ksis.DeleteStream("test") - if err != nil { - fmt.Printf("DeleteStream ERROR: %v\n", err) - return - } + if err != nil { + fmt.Printf("DeleteStream ERROR: %v\n", err) + return + } - fmt.Printf("Stream [%v] is DELETING\n", streamName) + fmt.Printf("Stream [%v] is DELETING\n", streamName) }