Broke apart generic files into directories
* Added new package name for each directory. * Update tests to match new package names.
This commit is contained in:
parent
b086b64d7b
commit
70c3b1bd79
13 changed files with 61 additions and 46 deletions
8
buffers/buffer.go
Normal file
8
buffers/buffer.go
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
package buffers
|
||||||
|
|
||||||
|
type Buffer interface {
|
||||||
|
Data() []byte
|
||||||
|
FirstSequenceNumber() string
|
||||||
|
LastSequenceNumber() string
|
||||||
|
NumMessagesInBuffer() int
|
||||||
|
}
|
||||||
|
|
@ -1,14 +1,7 @@
|
||||||
package etl
|
package buffers
|
||||||
|
|
||||||
import "bytes"
|
import "bytes"
|
||||||
|
|
||||||
type Buffer interface {
|
|
||||||
Data() []byte
|
|
||||||
FirstSequenceNumber() string
|
|
||||||
LastSequenceNumber() string
|
|
||||||
NumMessagesInBuffer() int
|
|
||||||
}
|
|
||||||
|
|
||||||
type MsgBuffer struct {
|
type MsgBuffer struct {
|
||||||
buffer bytes.Buffer
|
buffer bytes.Buffer
|
||||||
firstSequenceNumber string
|
firstSequenceNumber string
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package etl
|
package buffers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
8
checkpoints/checkpoint.go
Normal file
8
checkpoints/checkpoint.go
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
package checkpoints
|
||||||
|
|
||||||
|
type Checkpoint interface {
|
||||||
|
CheckpointExists(streamName string, shardID string) bool
|
||||||
|
SequenceNumber() string
|
||||||
|
SetCheckpoint(streamName string, shardID string, sequenceNumber string)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package etl
|
package checkpoints
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -6,12 +6,6 @@ import (
|
||||||
"github.com/hoisie/redis"
|
"github.com/hoisie/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Checkpoint interface {
|
|
||||||
CheckpointExists(streamName string, shardID string) bool
|
|
||||||
SequenceNumber() string
|
|
||||||
SetCheckpoint(streamName string, shardID string, sequenceNumber string)
|
|
||||||
}
|
|
||||||
|
|
||||||
type RedisCheckpoint struct {
|
type RedisCheckpoint struct {
|
||||||
AppName string
|
AppName string
|
||||||
client redis.Client
|
client redis.Client
|
||||||
|
|
@ -1,8 +1,9 @@
|
||||||
package etl
|
package checkpoints
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/hoisie/redis"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hoisie/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestKeyGen(t *testing.T) {
|
func TestKeyGen(t *testing.T) {
|
||||||
5
emitters/emitter.go
Normal file
5
emitters/emitter.go
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
package emitters
|
||||||
|
|
||||||
|
type Emitter interface {
|
||||||
|
Emit(path string, data []byte)
|
||||||
|
}
|
||||||
16
emitters/redshift_emitter.go
Normal file
16
emitters/redshift_emitter.go
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
package emitters
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RedshiftEmitter struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e RedshiftEmitter) Emit(path string, data []byte) {
|
||||||
|
// first call S3 bucket
|
||||||
|
// pg.query("COPY file_path TO table_name")
|
||||||
|
// pg.query("INSERT INTO imported_files VALUE file_path")
|
||||||
|
fmt.Printf("debug: emitting %v to Redshift\n", path)
|
||||||
|
fmt.Println(string(data))
|
||||||
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package etl
|
package emitters
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -6,12 +6,9 @@ import (
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
"github.com/crowdmob/goamz/s3"
|
"github.com/crowdmob/goamz/s3"
|
||||||
|
"github.com/harlow/go-etl/buffers"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Emitter interface {
|
|
||||||
Emit(path string, data []byte)
|
|
||||||
}
|
|
||||||
|
|
||||||
type S3Emitter struct {
|
type S3Emitter struct {
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
}
|
}
|
||||||
|
|
@ -21,7 +18,7 @@ func (e S3Emitter) s3FileName(firstSeq string, lastSeq string) string {
|
||||||
return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq)
|
return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e S3Emitter) Emit(buffer Buffer) {
|
func (e S3Emitter) Emit(buffer buffers.Buffer) {
|
||||||
auth, _ := aws.EnvAuth()
|
auth, _ := aws.EnvAuth()
|
||||||
s := s3.New(auth, aws.USEast)
|
s := s3.New(auth, aws.USEast)
|
||||||
b := s.Bucket(e.S3Bucket)
|
b := s.Bucket(e.S3Bucket)
|
||||||
|
|
@ -30,14 +27,3 @@ func (e S3Emitter) Emit(buffer Buffer) {
|
||||||
fmt.Printf("Successfully emitted %v records to S3 in s3://%v/%v", buffer.NumMessagesInBuffer(), b, f)
|
fmt.Printf("Successfully emitted %v records to S3 in s3://%v/%v", buffer.NumMessagesInBuffer(), b, f)
|
||||||
fmt.Println(r)
|
fmt.Println(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
type RedshiftEmitter struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e RedshiftEmitter) Emit(path string, data []byte) {
|
|
||||||
// first call S3 bucket
|
|
||||||
// pg.query("COPY file_path TO table_name")
|
|
||||||
// pg.query("INSERT INTO imported_files VALUE file_path")
|
|
||||||
fmt.Printf("debug: emitting %v to Redshift\n", path)
|
|
||||||
fmt.Println(string(data))
|
|
||||||
}
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package etl
|
package emitters
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPath(t *testing.T) {
|
func TestS3FileName(t *testing.T) {
|
||||||
d := time.Now().UTC().Format("2006-01-02")
|
d := time.Now().UTC().Format("2006-01-02")
|
||||||
n := fmt.Sprintf("/%v/a-b.txt", d)
|
n := fmt.Sprintf("/%v/a-b.txt", d)
|
||||||
e := S3Emitter{}
|
e := S3Emitter{}
|
||||||
|
|
@ -3,7 +3,9 @@ package main
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/harlow/go-etl"
|
"github.com/harlow/go-etl/checkpoints"
|
||||||
|
"github.com/harlow/go-etl/emitters"
|
||||||
|
"github.com/harlow/go-etl/utils"
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
@ -13,9 +15,9 @@ func main() {
|
||||||
|
|
||||||
s := "inputStream"
|
s := "inputStream"
|
||||||
k := kinesis.New("", "", kinesis.Region{})
|
k := kinesis.New("", "", kinesis.Region{})
|
||||||
c := etl.RedisCheckpoint{AppName: "sampleApp"}
|
c := checkpoints.RedisCheckpoint{AppName: "sampleApp"}
|
||||||
e := etl.S3Emitter{S3Bucket: "bucketName"}
|
e := emitters.S3Emitter{S3Bucket: "bucketName"}
|
||||||
// t := etl.EventTransformer{}
|
// t := transformers.EventTransformer{}
|
||||||
|
|
||||||
args := kinesis.NewArgs()
|
args := kinesis.NewArgs()
|
||||||
args.Add("StreamName", s)
|
args.Add("StreamName", s)
|
||||||
|
|
@ -27,7 +29,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, shard := range streamInfo.StreamDescription.Shards {
|
for _, shard := range streamInfo.StreamDescription.Shards {
|
||||||
go etl.GetRecords(k, &c, e, s, shard.ShardId)
|
go utils.GetRecords(k, &c, e, s, shard.ShardId)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,8 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/harlow/go-etl"
|
|
||||||
|
"github.com/harlow/go-etl/utils"
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
@ -28,7 +29,7 @@ func main() {
|
||||||
streamName := "inputStream"
|
streamName := "inputStream"
|
||||||
ksis := kinesis.New("", "", kinesis.Region{})
|
ksis := kinesis.New("", "", kinesis.Region{})
|
||||||
|
|
||||||
etl.CreateAndWaitForStreamToBecomeAvailable(ksis, streamName, 2)
|
utils.CreateAndWaitForStreamToBecomeAvailable(ksis, streamName, 2)
|
||||||
putSampleDataOnStream(ksis, streamName, 50)
|
putSampleDataOnStream(ksis, streamName, 50)
|
||||||
// deleteStream(ksis, streamName)
|
// deleteStream(ksis, streamName)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,10 @@
|
||||||
package etl
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CreateAndWaitForStreamToBecomeAvailable(ksis *kinesis.Kinesis, streamName string, shardCount int) {
|
func CreateAndWaitForStreamToBecomeAvailable(ksis *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
Loading…
Reference in a new issue