From 7c631ba8c0f182a85220013495ee1bcd2eb2adf9 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sat, 15 Nov 2014 17:00:37 -0800 Subject: [PATCH] Add StringToString transformer In some cases we'll want to save the data from the stream directly with no transformation needed. This will allow us to return the raw data string from the stream * Add new StringToStringTransformer * Remove Record from codebase in favor of more generic interface --- all_pass_filter.go | 2 +- buffer.go | 4 ++-- filter.go | 2 +- record.go | 7 ------- record_buffer.go | 8 ++++---- string_to_string_transformer.go | 11 +++++++++++ transformer.go | 4 ++-- 7 files changed, 21 insertions(+), 17 deletions(-) delete mode 100644 record.go create mode 100644 string_to_string_transformer.go diff --git a/all_pass_filter.go b/all_pass_filter.go index 342cc86..9e18285 100644 --- a/all_pass_filter.go +++ b/all_pass_filter.go @@ -4,6 +4,6 @@ package connector type AllPassFilter struct{} // Returns true for all records. -func (b *AllPassFilter) KeepRecord(r Record) bool { +func (b *AllPassFilter) KeepRecord(r interface{}) bool { return true } diff --git a/buffer.go b/buffer.go index fc4e3c0..7209c27 100644 --- a/buffer.go +++ b/buffer.go @@ -6,11 +6,11 @@ package connector // time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on // these limits. type Buffer interface { - ProcessRecord(record Record, sequenceNumber string) + ProcessRecord(record interface{}, sequenceNumber string) FirstSequenceNumber() string Flush() LastSequenceNumber() string NumRecordsInBuffer() int - Records() []Record + Records() []interface{} ShouldFlush() bool } diff --git a/filter.go b/filter.go index 575b063..1b4b582 100644 --- a/filter.go +++ b/filter.go @@ -6,5 +6,5 @@ package connector // A method enabling the buffer to filter records. Return false if you don't want to hold on to // the record. type Filter interface { - KeepRecord(r Record) bool + KeepRecord(r interface{}) bool } diff --git a/record.go b/record.go deleted file mode 100644 index 3e8e9d7..0000000 --- a/record.go +++ /dev/null @@ -1,7 +0,0 @@ -package connector - -// Used to store the data being sent through the Kinesis stream -type Record interface { - ToDelimitedString() string - ToJson() []byte -} diff --git a/record_buffer.go b/record_buffer.go index 8d02d46..52b2a05 100644 --- a/record_buffer.go +++ b/record_buffer.go @@ -4,16 +4,16 @@ package connector // records that are periodically flushed. It is configured with an implementation of Filter that // decides whether a record will be added to the buffer to be emitted. type RecordBuffer struct { - NumRecordsToBuffer int + NumRecordsToBuffer int firstSequenceNumber string lastSequenceNumber string - recordsInBuffer []Record + recordsInBuffer []interface{} sequencesInBuffer []string } // Adds a message to the buffer. -func (b *RecordBuffer) ProcessRecord(record Record, sequenceNumber string) { +func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) { if len(b.sequencesInBuffer) == 0 { b.firstSequenceNumber = sequenceNumber } @@ -27,7 +27,7 @@ func (b *RecordBuffer) ProcessRecord(record Record, sequenceNumber string) { } // Returns the records in the buffer. -func (b *RecordBuffer) Records() []Record { +func (b *RecordBuffer) Records() []interface{} { return b.recordsInBuffer } diff --git a/string_to_string_transformer.go b/string_to_string_transformer.go new file mode 100644 index 0000000..9288895 --- /dev/null +++ b/string_to_string_transformer.go @@ -0,0 +1,11 @@ +package connector + +type StringToStringTransformer struct{} + +func (t StringToStringTransformer) ToRecord(data []byte) interface{} { + return string(data) +} + +func (t StringToStringTransformer) FromRecord(s interface{}) []byte { + return []byte(s.(string)) +} diff --git a/transformer.go b/transformer.go index 956c312..26adce4 100644 --- a/transformer.go +++ b/transformer.go @@ -3,6 +3,6 @@ package connector // Transformer is used to transform data (byte array) to a Record for // processing in the application. type Transformer interface { - ToRecord(data []byte) Record - FromRecord(r Record) []byte + ToRecord(data []byte) interface{} + FromRecord(r interface{}) []byte }