Rename Model to Record

To match the DSL of the Kinesis library rename the Model interface to
Record.
This commit is contained in:
Harlow Ward 2014-11-15 15:54:54 -08:00
parent ad57271712
commit b98adcf659
12 changed files with 64 additions and 54 deletions

View file

@ -4,6 +4,6 @@ package connector
type AllPassFilter struct{} type AllPassFilter struct{}
// Returns true for all records. // Returns true for all records.
func (b *AllPassFilter) KeepRecord(m Model) bool { func (b *AllPassFilter) KeepRecord(r Record) bool {
return true return true
} }

View file

@ -6,11 +6,11 @@ package connector
// time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on // time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on
// these limits. // these limits.
type Buffer interface { type Buffer interface {
Add(data Model, sequenceNumber string) ProcessRecord(record Record, sequenceNumber string)
FirstSequenceNumber() string FirstSequenceNumber() string
Flush() Flush()
LastSequenceNumber() string LastSequenceNumber() string
NumRecordsInBuffer() int NumRecordsInBuffer() int
Records() []Model Records() []Record
ShouldFlush() bool ShouldFlush() bool
} }

View file

@ -7,5 +7,5 @@ package connector
// Implementations may choose to fail the entire set of records in the buffer or to fail records // Implementations may choose to fail the entire set of records in the buffer or to fail records
// individually. // individually.
type Emitter interface { type Emitter interface {
Emit(buffer Buffer) Emit(b Buffer, t Transformer)
} }

View file

@ -2,6 +2,9 @@ package connector
// The Filter is associated with an Buffer. The Buffer may use the result of calling the // The Filter is associated with an Buffer. The Buffer may use the result of calling the
// KeepRecord() method to decide whether to store a record or discard it. // KeepRecord() method to decide whether to store a record or discard it.
// A method enabling the buffer to filter records. Return false if you don't want to hold on to
// the record.
type Filter interface { type Filter interface {
KeepRecord(m Model) bool KeepRecord(r Record) bool
} }

View file

@ -1,6 +0,0 @@
package connector
// Used to map the attributres of the data being sent through the Kinesis stream
type Model interface {
ToString() string
}

View file

@ -61,10 +61,10 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
continue continue
} }
var m = p.Transformer.ToModel(data) r := p.Transformer.ToRecord(data)
if p.Filter.KeepRecord(m) { if p.Filter.KeepRecord(r) {
p.Buffer.Add(m, v.SequenceNumber) p.Buffer.ProcessRecord(r, v.SequenceNumber)
} }
} }
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
@ -77,7 +77,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
if p.Buffer.ShouldFlush() { if p.Buffer.ShouldFlush() {
fmt.Printf("Emitting to Shard: %v\n", shardID) fmt.Printf("Emitting to Shard: %v\n", shardID)
p.Emitter.Emit(p.Buffer) p.Emitter.Emit(p.Buffer, p.Transformer)
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
p.Buffer.Flush() p.Buffer.Flush()
} }

7
record.go Normal file
View file

@ -0,0 +1,7 @@
package connector
// Used to store the data being sent through the Kinesis stream
type Record interface {
ToDelimitedString() string
ToJson() []byte
}

View file

@ -7,12 +7,12 @@ type RecordBuffer struct {
NumRecordsToBuffer int NumRecordsToBuffer int
firstSequenceNumber string firstSequenceNumber string
lastSequenceNumber string lastSequenceNumber string
recordsInBuffer []Model recordsInBuffer []Record
sequencesInBuffer []string sequencesInBuffer []string
} }
// Adds a message to the buffer. // Adds a message to the buffer.
func (b *RecordBuffer) Add(record Model, sequenceNumber string) { func (b *RecordBuffer) ProcessRecord(record Record, sequenceNumber string) {
if len(b.sequencesInBuffer) == 0 { if len(b.sequencesInBuffer) == 0 {
b.firstSequenceNumber = sequenceNumber b.firstSequenceNumber = sequenceNumber
} }
@ -26,7 +26,7 @@ func (b *RecordBuffer) Add(record Model, sequenceNumber string) {
} }
// Returns the records in the buffer. // Returns the records in the buffer.
func (b *RecordBuffer) Records() []Model { func (b *RecordBuffer) Records() []Record {
return b.recordsInBuffer return b.recordsInBuffer
} }

View file

@ -2,30 +2,34 @@ package connector
import "testing" import "testing"
type TestModel struct{} type TestRecord struct{}
func (u TestModel) ToString() string { func (r TestRecord) ToDelimitedString() string {
return "ok" return "test"
} }
func TestAdd(t *testing.T) { func (r TestRecord) ToJson() []byte {
var r1, s1 = TestModel{}, "Seq1" return []byte("test")
var r2, s2 = TestModel{}, "Seq2" }
func TestProcessRecord(t *testing.T) {
var r1, s1 = TestRecord{}, "Seq1"
var r2, s2 = TestRecord{}, "Seq2"
b := RecordBuffer{} b := RecordBuffer{}
b.Add(r1, s1) b.ProcessRecord(r1, s1)
if b.NumRecordsInBuffer() != 1 { if b.NumRecordsInBuffer() != 1 {
t.Errorf("NumRecordsInBuffer() want %v", 1) t.Errorf("NumRecordsInBuffer() want %v", 1)
} }
b.Add(r2, s2) b.ProcessRecord(r2, s2)
if b.NumRecordsInBuffer() != 2 { if b.NumRecordsInBuffer() != 2 {
t.Errorf("NumRecordsInBuffer() want %v", 2) t.Errorf("NumRecordsInBuffer() want %v", 2)
} }
b.Add(r2, s2) b.ProcessRecord(r2, s2)
if b.NumRecordsInBuffer() != 2 { if b.NumRecordsInBuffer() != 2 {
t.Errorf("NumRecordsInBuffer() want %v", 2) t.Errorf("NumRecordsInBuffer() want %v", 2)
@ -33,17 +37,17 @@ func TestAdd(t *testing.T) {
} }
func TestSequenceExists(t *testing.T) { func TestSequenceExists(t *testing.T) {
var r1, s1 = TestModel{}, "Seq1" var r1, s1 = TestRecord{}, "Seq1"
var r2, s2 = TestModel{}, "Seq2" var r2, s2 = TestRecord{}, "Seq2"
b := RecordBuffer{} b := RecordBuffer{}
b.Add(r1, s1) b.ProcessRecord(r1, s1)
if b.sequenceExists(s1) != true { if b.sequenceExists(s1) != true {
t.Errorf("sequenceExists() want %v", true) t.Errorf("sequenceExists() want %v", true)
} }
b.Add(r2, s2) b.ProcessRecord(r2, s2)
if b.sequenceExists(s2) != true { if b.sequenceExists(s2) != true {
t.Errorf("sequenceExists() want %v", true) t.Errorf("sequenceExists() want %v", true)
@ -51,9 +55,9 @@ func TestSequenceExists(t *testing.T) {
} }
func TestFlush(t *testing.T) { func TestFlush(t *testing.T) {
var r1, s1 = TestModel{}, "SeqNum" var r1, s1 = TestRecord{}, "SeqNum"
b := RecordBuffer{} b := RecordBuffer{}
b.Add(r1, s1) b.ProcessRecord(r1, s1)
b.Flush() b.Flush()
@ -63,17 +67,17 @@ func TestFlush(t *testing.T) {
} }
func TestLastSequenceNumber(t *testing.T) { func TestLastSequenceNumber(t *testing.T) {
var r1, s1 = TestModel{}, "Seq1" var r1, s1 = TestRecord{}, "Seq1"
var r2, s2 = TestModel{}, "Seq2" var r2, s2 = TestRecord{}, "Seq2"
b := RecordBuffer{} b := RecordBuffer{}
b.Add(r1, s1) b.ProcessRecord(r1, s1)
if b.LastSequenceNumber() != s1 { if b.LastSequenceNumber() != s1 {
t.Errorf("LastSequenceNumber() want %v", s1) t.Errorf("LastSequenceNumber() want %v", s1)
} }
b.Add(r2, s2) b.ProcessRecord(r2, s2)
if b.LastSequenceNumber() != s2 { if b.LastSequenceNumber() != s2 {
t.Errorf("LastSequenceNumber() want %v", s2) t.Errorf("LastSequenceNumber() want %v", s2)
@ -81,17 +85,17 @@ func TestLastSequenceNumber(t *testing.T) {
} }
func TestFirstSequenceNumber(t *testing.T) { func TestFirstSequenceNumber(t *testing.T) {
var r1, s1 = TestModel{}, "Seq1" var r1, s1 = TestRecord{}, "Seq1"
var r2, s2 = TestModel{}, "Seq2" var r2, s2 = TestRecord{}, "Seq2"
b := RecordBuffer{} b := RecordBuffer{}
b.Add(r1, s1) b.ProcessRecord(r1, s1)
if b.FirstSequenceNumber() != s1 { if b.FirstSequenceNumber() != s1 {
t.Errorf("FirstSequenceNumber() want %v", s1) t.Errorf("FirstSequenceNumber() want %v", s1)
} }
b.Add(r2, s2) b.ProcessRecord(r2, s2)
if b.FirstSequenceNumber() != s1 { if b.FirstSequenceNumber() != s1 {
t.Errorf("FirstSequenceNumber() want %v", s1) t.Errorf("FirstSequenceNumber() want %v", s1)
@ -100,17 +104,17 @@ func TestFirstSequenceNumber(t *testing.T) {
func TestShouldFlush(t *testing.T) { func TestShouldFlush(t *testing.T) {
const n = 2 const n = 2
var r1, s1 = TestModel{}, "Seq1" var r1, s1 = TestRecord{}, "Seq1"
var r2, s2 = TestModel{}, "Seq2" var r2, s2 = TestRecord{}, "Seq2"
b := RecordBuffer{NumRecordsToBuffer: n} b := RecordBuffer{NumRecordsToBuffer: n}
b.Add(r1, s1) b.ProcessRecord(r1, s1)
if b.ShouldFlush() != false { if b.ShouldFlush() != false {
t.Errorf("ShouldFlush() want %v", false) t.Errorf("ShouldFlush() want %v", false)
} }
b.Add(r2, s2) b.ProcessRecord(r2, s2)
if b.ShouldFlush() != true { if b.ShouldFlush() != true {
t.Errorf("ShouldFlush() want %v", true) t.Errorf("ShouldFlush() want %v", true)

View file

@ -23,9 +23,9 @@ type RedshiftEmitter struct {
// Invoked when the buffer is full. This method leverages the S3Emitter and then issues a copy command to // Invoked when the buffer is full. This method leverages the S3Emitter and then issues a copy command to
// Redshift data store. // Redshift data store.
func (e RedshiftEmitter) Emit(b Buffer) { func (e RedshiftEmitter) Emit(b Buffer, t Transformer) {
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
s3Emitter.Emit(b) s3Emitter.Emit(b, t)
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))

View file

@ -26,16 +26,17 @@ func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
} }
// Invoked when the buffer is full. This method emits the set of filtered records. // Invoked when the buffer is full. This method emits the set of filtered records.
func (e S3Emitter) Emit(buf Buffer) { func (e S3Emitter) Emit(b Buffer, t Transformer) {
auth, _ := aws.EnvAuth() auth, _ := aws.EnvAuth()
s3Con := s3.New(auth, aws.USEast) s3Con := s3.New(auth, aws.USEast)
bucket := s3Con.Bucket(e.S3Bucket) bucket := s3Con.Bucket(e.S3Bucket)
s3File := e.S3FileName(buf.FirstSequenceNumber(), buf.LastSequenceNumber()) s3File := e.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
var buffer bytes.Buffer var buffer bytes.Buffer
for _, r := range buf.Records() { for _, r := range b.Records() {
buffer.WriteString(r.ToString()) var s = t.FromRecord(r)
buffer.Write(s)
} }
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
@ -43,6 +44,6 @@ func (e S3Emitter) Emit(buf Buffer) {
if err != nil { if err != nil {
fmt.Printf("Error occured while uploding to S3: %v\n", err) fmt.Printf("Error occured while uploding to S3: %v\n", err)
} else { } else {
fmt.Printf("Emitted %v records to S3 in s3://%v%v\n", buf.NumRecordsInBuffer(), e.S3Bucket, s3File) fmt.Printf("Emitted %v records to S3 in s3://%v%v\n", b.NumRecordsInBuffer(), e.S3Bucket, s3File)
} }
} }

View file

@ -1,7 +1,8 @@
package connector package connector
// Transformer is used to transform data from a Record (byte array) to the data model for // Transformer is used to transform data (byte array) to a Record for
// processing in the application. // processing in the application.
type Transformer interface { type Transformer interface {
ToModel(data []byte) Model ToRecord(data []byte) Record
FromRecord(r Record) []byte
} }