Add Emitters
* Add first s3 Emitter along with Emitter Interface. * Replace setters with initializers.
This commit is contained in:
parent
f7d1202fe0
commit
b086b64d7b
8 changed files with 116 additions and 73 deletions
18
buffers.go
18
buffers.go
|
|
@ -1,10 +1,13 @@
|
||||||
package etl
|
package etl
|
||||||
|
|
||||||
import (
|
import "bytes"
|
||||||
"bytes"
|
|
||||||
"fmt"
|
type Buffer interface {
|
||||||
"time"
|
Data() []byte
|
||||||
)
|
FirstSequenceNumber() string
|
||||||
|
LastSequenceNumber() string
|
||||||
|
NumMessagesInBuffer() int
|
||||||
|
}
|
||||||
|
|
||||||
type MsgBuffer struct {
|
type MsgBuffer struct {
|
||||||
buffer bytes.Buffer
|
buffer bytes.Buffer
|
||||||
|
|
@ -40,11 +43,6 @@ func (b MsgBuffer) SequenceExists(sequenceNumber string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b MsgBuffer) FileName() string {
|
|
||||||
date := time.Now().UTC().Format("2006-01-02")
|
|
||||||
return fmt.Sprintf("/%v/%v-%v.txt", date, b.firstSequenceNumber, b.lastSequenceNumber)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b MsgBuffer) Data() []byte {
|
func (b MsgBuffer) Data() []byte {
|
||||||
return b.buffer.Bytes()
|
return b.buffer.Bytes()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,9 +2,7 @@ package etl
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNumMessagesToBuffer(t *testing.T) {
|
func TestNumMessagesToBuffer(t *testing.T) {
|
||||||
|
|
@ -139,18 +137,3 @@ func TestData(t *testing.T) {
|
||||||
t.Errorf("Data() want %v", body)
|
t.Errorf("Data() want %v", body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFileName(t *testing.T) {
|
|
||||||
var r1, s1 = []byte("Record1"), "Seq1"
|
|
||||||
var r2, s2 = []byte("Record2"), "Seq2"
|
|
||||||
date := time.Now().UTC().Format("2006-01-02")
|
|
||||||
name := fmt.Sprintf("/%v/Seq1-Seq2.txt", date)
|
|
||||||
|
|
||||||
b := MsgBuffer{}
|
|
||||||
b.ConsumeRecord(r1, s1)
|
|
||||||
b.ConsumeRecord(r2, s2)
|
|
||||||
|
|
||||||
if b.FileName() != name {
|
|
||||||
t.Errorf("FileName() = want %v", b.FileName(), name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ type Checkpoint interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type RedisCheckpoint struct {
|
type RedisCheckpoint struct {
|
||||||
appName string
|
AppName string
|
||||||
client redis.Client
|
client redis.Client
|
||||||
sequenceNumber string
|
sequenceNumber string
|
||||||
}
|
}
|
||||||
|
|
@ -41,5 +41,5 @@ func (c *RedisCheckpoint) SetCheckpoint(streamName string, shardID string, seque
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c RedisCheckpoint) keyGen(streamName string, shardID string) string {
|
func (c RedisCheckpoint) keyGen(streamName string, shardID string) string {
|
||||||
return fmt.Sprintf("%v:checkpoint:%v:%v", c.appName, streamName, shardID)
|
return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, streamName, shardID)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import (
|
||||||
|
|
||||||
func TestKeyGen(t *testing.T) {
|
func TestKeyGen(t *testing.T) {
|
||||||
k := "app:checkpoint:stream:shard"
|
k := "app:checkpoint:stream:shard"
|
||||||
c := RedisCheckpoint{appName: "app"}
|
c := RedisCheckpoint{AppName: "app"}
|
||||||
|
|
||||||
r := c.keyGen("stream", "shard")
|
r := c.keyGen("stream", "shard")
|
||||||
|
|
||||||
|
|
@ -20,7 +20,7 @@ func TestCheckpointExists(t *testing.T) {
|
||||||
var rc redis.Client
|
var rc redis.Client
|
||||||
k := "app:checkpoint:stream:shard"
|
k := "app:checkpoint:stream:shard"
|
||||||
rc.Set(k, []byte("fakeSeqNum"))
|
rc.Set(k, []byte("fakeSeqNum"))
|
||||||
c := RedisCheckpoint{appName: "app"}
|
c := RedisCheckpoint{AppName: "app"}
|
||||||
|
|
||||||
r := c.CheckpointExists("stream", "shard")
|
r := c.CheckpointExists("stream", "shard")
|
||||||
|
|
||||||
|
|
@ -34,7 +34,7 @@ func TestCheckpointExists(t *testing.T) {
|
||||||
func TestSetCheckpoint(t *testing.T) {
|
func TestSetCheckpoint(t *testing.T) {
|
||||||
k := "app:checkpoint:stream:shard"
|
k := "app:checkpoint:stream:shard"
|
||||||
var rc redis.Client
|
var rc redis.Client
|
||||||
c := RedisCheckpoint{appName: "app"}
|
c := RedisCheckpoint{AppName: "app"}
|
||||||
c.SetCheckpoint("stream", "shard", "fakeSeqNum")
|
c.SetCheckpoint("stream", "shard", "fakeSeqNum")
|
||||||
|
|
||||||
r, _ := rc.Get(k)
|
r, _ := rc.Get(k)
|
||||||
|
|
|
||||||
43
emitters.go
Normal file
43
emitters.go
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
package etl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
"github.com/crowdmob/goamz/s3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Emitter interface {
|
||||||
|
Emit(path string, data []byte)
|
||||||
|
}
|
||||||
|
|
||||||
|
type S3Emitter struct {
|
||||||
|
S3Bucket string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e S3Emitter) s3FileName(firstSeq string, lastSeq string) string {
|
||||||
|
date := time.Now().UTC().Format("2006-01-02")
|
||||||
|
return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e S3Emitter) Emit(buffer Buffer) {
|
||||||
|
auth, _ := aws.EnvAuth()
|
||||||
|
s := s3.New(auth, aws.USEast)
|
||||||
|
b := s.Bucket(e.S3Bucket)
|
||||||
|
f := e.s3FileName(buffer.FirstSequenceNumber(), buffer.LastSequenceNumber())
|
||||||
|
r := b.Put(f, buffer.Data(), "text/plain", s3.Private, s3.Options{})
|
||||||
|
fmt.Printf("Successfully emitted %v records to S3 in s3://%v/%v", buffer.NumMessagesInBuffer(), b, f)
|
||||||
|
fmt.Println(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
type RedshiftEmitter struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e RedshiftEmitter) Emit(path string, data []byte) {
|
||||||
|
// first call S3 bucket
|
||||||
|
// pg.query("COPY file_path TO table_name")
|
||||||
|
// pg.query("INSERT INTO imported_files VALUE file_path")
|
||||||
|
fmt.Printf("debug: emitting %v to Redshift\n", path)
|
||||||
|
fmt.Println(string(data))
|
||||||
|
}
|
||||||
18
emitters_test.go
Normal file
18
emitters_test.go
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
package etl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPath(t *testing.T) {
|
||||||
|
d := time.Now().UTC().Format("2006-01-02")
|
||||||
|
n := fmt.Sprintf("/%v/a-b.txt", d)
|
||||||
|
e := S3Emitter{}
|
||||||
|
f := e.s3FileName("a", "b")
|
||||||
|
|
||||||
|
if f != n {
|
||||||
|
t.Errorf("s3FileName() = want %v", f, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -2,6 +2,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/harlow/go-etl"
|
"github.com/harlow/go-etl"
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
|
|
@ -10,14 +11,10 @@ import (
|
||||||
func main() {
|
func main() {
|
||||||
godotenv.Load()
|
godotenv.Load()
|
||||||
|
|
||||||
k := kinesis.New("", "", kinesis.Region{})
|
|
||||||
s := "inputStream"
|
s := "inputStream"
|
||||||
|
k := kinesis.New("", "", kinesis.Region{})
|
||||||
c := etl.RedisCheckpoint{appName: "sampleApp"}
|
c := etl.RedisCheckpoint{AppName: "sampleApp"}
|
||||||
|
e := etl.S3Emitter{S3Bucket: "bucketName"}
|
||||||
e := etl.S3Emitter{}
|
|
||||||
e.SetBucketName("bucketName")
|
|
||||||
|
|
||||||
// t := etl.EventTransformer{}
|
// t := etl.EventTransformer{}
|
||||||
|
|
||||||
args := kinesis.NewArgs()
|
args := kinesis.NewArgs()
|
||||||
|
|
|
||||||
72
utils.go
72
utils.go
|
|
@ -1,54 +1,58 @@
|
||||||
package etl
|
package etl
|
||||||
|
|
||||||
import(
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
"time"
|
"time"
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func CreateAndWaitForStreamToBecomeAvailable(ksis *kinesis.Kinesis, streamName string, shardCount int) {
|
func CreateAndWaitForStreamToBecomeAvailable(ksis *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
if !StreamExists(ksis, streamName) {
|
if !StreamExists(ksis, streamName) {
|
||||||
err := ksis.CreateStream(streamName, shardCount)
|
err := ksis.CreateStream(streamName, shardCount)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("CreateStream ERROR: %v\n", err)
|
fmt.Printf("CreateStream ERROR: %v\n", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp := &kinesis.DescribeStreamResp{}
|
resp := &kinesis.DescribeStreamResp{}
|
||||||
timeout := make(chan bool, 30)
|
timeout := make(chan bool, 30)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
args := kinesis.NewArgs()
|
args := kinesis.NewArgs()
|
||||||
args.Add("StreamName", streamName)
|
args.Add("StreamName", streamName)
|
||||||
resp, _ = ksis.DescribeStream(args)
|
resp, _ = ksis.DescribeStream(args)
|
||||||
streamStatus := resp.StreamDescription.StreamStatus
|
streamStatus := resp.StreamDescription.StreamStatus
|
||||||
fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus)
|
fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus)
|
||||||
|
|
||||||
if streamStatus != "ACTIVE" {
|
if streamStatus != "ACTIVE" {
|
||||||
time.Sleep(4 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
timeout <- true
|
timeout <- true
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func StreamExists(ksis *kinesis.Kinesis, streamName string) bool {
|
func StreamExists(ksis *kinesis.Kinesis, streamName string) bool {
|
||||||
args := kinesis.NewArgs()
|
args := kinesis.NewArgs()
|
||||||
resp, _ := ksis.ListStreams(args)
|
resp, _ := ksis.ListStreams(args)
|
||||||
for _, name := range resp.StreamNames { if name == streamName { return true } }
|
for _, name := range resp.StreamNames {
|
||||||
return false
|
if name == streamName {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeleteStream(ksis *kinesis.Kinesis, streamName string) {
|
func DeleteStream(ksis *kinesis.Kinesis, streamName string) {
|
||||||
err := ksis.DeleteStream("test")
|
err := ksis.DeleteStream("test")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("DeleteStream ERROR: %v\n", err)
|
fmt.Printf("DeleteStream ERROR: %v\n", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("Stream [%v] is DELETING\n", streamName)
|
fmt.Printf("Stream [%v] is DELETING\n", streamName)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue