From b98adcf659de0f0a53abdb05bdd3521071ad15e9 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sat, 15 Nov 2014 15:54:54 -0800 Subject: [PATCH] Rename Model to Record To match the DSL of the Kinesis library rename the Model interface to Record. --- all_pass_filter.go | 2 +- buffer.go | 4 +-- emitter.go | 2 +- filter.go | 5 +++- model.go | 6 ----- pipeline.go | 8 +++--- record.go | 7 ++++++ record_buffer.go | 6 ++--- record_buffer_test.go | 58 +++++++++++++++++++++++-------------------- redshift_emitter.go | 4 +-- s3_emitter.go | 11 ++++---- transformer.go | 5 ++-- 12 files changed, 64 insertions(+), 54 deletions(-) delete mode 100644 model.go create mode 100644 record.go diff --git a/all_pass_filter.go b/all_pass_filter.go index 9fc68b6..342cc86 100644 --- a/all_pass_filter.go +++ b/all_pass_filter.go @@ -4,6 +4,6 @@ package connector type AllPassFilter struct{} // Returns true for all records. -func (b *AllPassFilter) KeepRecord(m Model) bool { +func (b *AllPassFilter) KeepRecord(r Record) bool { return true } diff --git a/buffer.go b/buffer.go index ce09ac4..fc4e3c0 100644 --- a/buffer.go +++ b/buffer.go @@ -6,11 +6,11 @@ package connector // time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on // these limits. type Buffer interface { - Add(data Model, sequenceNumber string) + ProcessRecord(record Record, sequenceNumber string) FirstSequenceNumber() string Flush() LastSequenceNumber() string NumRecordsInBuffer() int - Records() []Model + Records() []Record ShouldFlush() bool } diff --git a/emitter.go b/emitter.go index 7fb120e..6cb8851 100644 --- a/emitter.go +++ b/emitter.go @@ -7,5 +7,5 @@ package connector // Implementations may choose to fail the entire set of records in the buffer or to fail records // individually. type Emitter interface { - Emit(buffer Buffer) + Emit(b Buffer, t Transformer) } diff --git a/filter.go b/filter.go index b7f8d05..575b063 100644 --- a/filter.go +++ b/filter.go @@ -2,6 +2,9 @@ package connector // The Filter is associated with an Buffer. The Buffer may use the result of calling the // KeepRecord() method to decide whether to store a record or discard it. + +// A method enabling the buffer to filter records. Return false if you don't want to hold on to +// the record. type Filter interface { - KeepRecord(m Model) bool + KeepRecord(r Record) bool } diff --git a/model.go b/model.go deleted file mode 100644 index 723fdad..0000000 --- a/model.go +++ /dev/null @@ -1,6 +0,0 @@ -package connector - -// Used to map the attributres of the data being sent through the Kinesis stream -type Model interface { - ToString() string -} diff --git a/pipeline.go b/pipeline.go index 4d5ed4d..4f960c7 100644 --- a/pipeline.go +++ b/pipeline.go @@ -61,10 +61,10 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { continue } - var m = p.Transformer.ToModel(data) + r := p.Transformer.ToRecord(data) - if p.Filter.KeepRecord(m) { - p.Buffer.Add(m, v.SequenceNumber) + if p.Filter.KeepRecord(r) { + p.Buffer.ProcessRecord(r, v.SequenceNumber) } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { @@ -77,7 +77,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if p.Buffer.ShouldFlush() { fmt.Printf("Emitting to Shard: %v\n", shardID) - p.Emitter.Emit(p.Buffer) + p.Emitter.Emit(p.Buffer, p.Transformer) p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) p.Buffer.Flush() } diff --git a/record.go b/record.go new file mode 100644 index 0000000..3e8e9d7 --- /dev/null +++ b/record.go @@ -0,0 +1,7 @@ +package connector + +// Used to store the data being sent through the Kinesis stream +type Record interface { + ToDelimitedString() string + ToJson() []byte +} diff --git a/record_buffer.go b/record_buffer.go index 8bfb287..d106a6f 100644 --- a/record_buffer.go +++ b/record_buffer.go @@ -7,12 +7,12 @@ type RecordBuffer struct { NumRecordsToBuffer int firstSequenceNumber string lastSequenceNumber string - recordsInBuffer []Model + recordsInBuffer []Record sequencesInBuffer []string } // Adds a message to the buffer. -func (b *RecordBuffer) Add(record Model, sequenceNumber string) { +func (b *RecordBuffer) ProcessRecord(record Record, sequenceNumber string) { if len(b.sequencesInBuffer) == 0 { b.firstSequenceNumber = sequenceNumber } @@ -26,7 +26,7 @@ func (b *RecordBuffer) Add(record Model, sequenceNumber string) { } // Returns the records in the buffer. -func (b *RecordBuffer) Records() []Model { +func (b *RecordBuffer) Records() []Record { return b.recordsInBuffer } diff --git a/record_buffer_test.go b/record_buffer_test.go index 7346abb..f35723e 100644 --- a/record_buffer_test.go +++ b/record_buffer_test.go @@ -2,30 +2,34 @@ package connector import "testing" -type TestModel struct{} +type TestRecord struct{} -func (u TestModel) ToString() string { - return "ok" +func (r TestRecord) ToDelimitedString() string { + return "test" } -func TestAdd(t *testing.T) { - var r1, s1 = TestModel{}, "Seq1" - var r2, s2 = TestModel{}, "Seq2" +func (r TestRecord) ToJson() []byte { + return []byte("test") +} + +func TestProcessRecord(t *testing.T) { + var r1, s1 = TestRecord{}, "Seq1" + var r2, s2 = TestRecord{}, "Seq2" b := RecordBuffer{} - b.Add(r1, s1) + b.ProcessRecord(r1, s1) if b.NumRecordsInBuffer() != 1 { t.Errorf("NumRecordsInBuffer() want %v", 1) } - b.Add(r2, s2) + b.ProcessRecord(r2, s2) if b.NumRecordsInBuffer() != 2 { t.Errorf("NumRecordsInBuffer() want %v", 2) } - b.Add(r2, s2) + b.ProcessRecord(r2, s2) if b.NumRecordsInBuffer() != 2 { t.Errorf("NumRecordsInBuffer() want %v", 2) @@ -33,17 +37,17 @@ func TestAdd(t *testing.T) { } func TestSequenceExists(t *testing.T) { - var r1, s1 = TestModel{}, "Seq1" - var r2, s2 = TestModel{}, "Seq2" + var r1, s1 = TestRecord{}, "Seq1" + var r2, s2 = TestRecord{}, "Seq2" b := RecordBuffer{} - b.Add(r1, s1) + b.ProcessRecord(r1, s1) if b.sequenceExists(s1) != true { t.Errorf("sequenceExists() want %v", true) } - b.Add(r2, s2) + b.ProcessRecord(r2, s2) if b.sequenceExists(s2) != true { t.Errorf("sequenceExists() want %v", true) @@ -51,9 +55,9 @@ func TestSequenceExists(t *testing.T) { } func TestFlush(t *testing.T) { - var r1, s1 = TestModel{}, "SeqNum" + var r1, s1 = TestRecord{}, "SeqNum" b := RecordBuffer{} - b.Add(r1, s1) + b.ProcessRecord(r1, s1) b.Flush() @@ -63,17 +67,17 @@ func TestFlush(t *testing.T) { } func TestLastSequenceNumber(t *testing.T) { - var r1, s1 = TestModel{}, "Seq1" - var r2, s2 = TestModel{}, "Seq2" + var r1, s1 = TestRecord{}, "Seq1" + var r2, s2 = TestRecord{}, "Seq2" b := RecordBuffer{} - b.Add(r1, s1) + b.ProcessRecord(r1, s1) if b.LastSequenceNumber() != s1 { t.Errorf("LastSequenceNumber() want %v", s1) } - b.Add(r2, s2) + b.ProcessRecord(r2, s2) if b.LastSequenceNumber() != s2 { t.Errorf("LastSequenceNumber() want %v", s2) @@ -81,17 +85,17 @@ func TestLastSequenceNumber(t *testing.T) { } func TestFirstSequenceNumber(t *testing.T) { - var r1, s1 = TestModel{}, "Seq1" - var r2, s2 = TestModel{}, "Seq2" + var r1, s1 = TestRecord{}, "Seq1" + var r2, s2 = TestRecord{}, "Seq2" b := RecordBuffer{} - b.Add(r1, s1) + b.ProcessRecord(r1, s1) if b.FirstSequenceNumber() != s1 { t.Errorf("FirstSequenceNumber() want %v", s1) } - b.Add(r2, s2) + b.ProcessRecord(r2, s2) if b.FirstSequenceNumber() != s1 { t.Errorf("FirstSequenceNumber() want %v", s1) @@ -100,17 +104,17 @@ func TestFirstSequenceNumber(t *testing.T) { func TestShouldFlush(t *testing.T) { const n = 2 - var r1, s1 = TestModel{}, "Seq1" - var r2, s2 = TestModel{}, "Seq2" + var r1, s1 = TestRecord{}, "Seq1" + var r2, s2 = TestRecord{}, "Seq2" b := RecordBuffer{NumRecordsToBuffer: n} - b.Add(r1, s1) + b.ProcessRecord(r1, s1) if b.ShouldFlush() != false { t.Errorf("ShouldFlush() want %v", false) } - b.Add(r2, s2) + b.ProcessRecord(r2, s2) if b.ShouldFlush() != true { t.Errorf("ShouldFlush() want %v", true) diff --git a/redshift_emitter.go b/redshift_emitter.go index f6639c8..88e4be8 100644 --- a/redshift_emitter.go +++ b/redshift_emitter.go @@ -23,9 +23,9 @@ type RedshiftEmitter struct { // Invoked when the buffer is full. This method leverages the S3Emitter and then issues a copy command to // Redshift data store. -func (e RedshiftEmitter) Emit(b Buffer) { +func (e RedshiftEmitter) Emit(b Buffer, t Transformer) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} - s3Emitter.Emit(b) + s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) diff --git a/s3_emitter.go b/s3_emitter.go index 5b863ec..6298ece 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -26,16 +26,17 @@ func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { } // Invoked when the buffer is full. This method emits the set of filtered records. -func (e S3Emitter) Emit(buf Buffer) { +func (e S3Emitter) Emit(b Buffer, t Transformer) { auth, _ := aws.EnvAuth() s3Con := s3.New(auth, aws.USEast) bucket := s3Con.Bucket(e.S3Bucket) - s3File := e.S3FileName(buf.FirstSequenceNumber(), buf.LastSequenceNumber()) + s3File := e.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) var buffer bytes.Buffer - for _, r := range buf.Records() { - buffer.WriteString(r.ToString()) + for _, r := range b.Records() { + var s = t.FromRecord(r) + buffer.Write(s) } err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) @@ -43,6 +44,6 @@ func (e S3Emitter) Emit(buf Buffer) { if err != nil { fmt.Printf("Error occured while uploding to S3: %v\n", err) } else { - fmt.Printf("Emitted %v records to S3 in s3://%v%v\n", buf.NumRecordsInBuffer(), e.S3Bucket, s3File) + fmt.Printf("Emitted %v records to S3 in s3://%v%v\n", b.NumRecordsInBuffer(), e.S3Bucket, s3File) } } diff --git a/transformer.go b/transformer.go index ad895bb..956c312 100644 --- a/transformer.go +++ b/transformer.go @@ -1,7 +1,8 @@ package connector -// Transformer is used to transform data from a Record (byte array) to the data model for +// Transformer is used to transform data (byte array) to a Record for // processing in the application. type Transformer interface { - ToModel(data []byte) Model + ToRecord(data []byte) Record + FromRecord(r Record) []byte }