diff --git a/all_pass_filter.go b/all_pass_filter.go index 342cc86..9e18285 100644 --- a/all_pass_filter.go +++ b/all_pass_filter.go @@ -4,6 +4,6 @@ package connector type AllPassFilter struct{} // Returns true for all records. -func (b *AllPassFilter) KeepRecord(r Record) bool { +func (b *AllPassFilter) KeepRecord(r interface{}) bool { return true } diff --git a/buffer.go b/buffer.go index fc4e3c0..7209c27 100644 --- a/buffer.go +++ b/buffer.go @@ -6,11 +6,11 @@ package connector // time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on // these limits. type Buffer interface { - ProcessRecord(record Record, sequenceNumber string) + ProcessRecord(record interface{}, sequenceNumber string) FirstSequenceNumber() string Flush() LastSequenceNumber() string NumRecordsInBuffer() int - Records() []Record + Records() []interface{} ShouldFlush() bool } diff --git a/filter.go b/filter.go index 575b063..1b4b582 100644 --- a/filter.go +++ b/filter.go @@ -6,5 +6,5 @@ package connector // A method enabling the buffer to filter records. Return false if you don't want to hold on to // the record. type Filter interface { - KeepRecord(r Record) bool + KeepRecord(r interface{}) bool } diff --git a/record.go b/record.go deleted file mode 100644 index 3e8e9d7..0000000 --- a/record.go +++ /dev/null @@ -1,7 +0,0 @@ -package connector - -// Used to store the data being sent through the Kinesis stream -type Record interface { - ToDelimitedString() string - ToJson() []byte -} diff --git a/record_buffer.go b/record_buffer.go index 8d02d46..52b2a05 100644 --- a/record_buffer.go +++ b/record_buffer.go @@ -4,16 +4,16 @@ package connector // records that are periodically flushed. It is configured with an implementation of Filter that // decides whether a record will be added to the buffer to be emitted. type RecordBuffer struct { - NumRecordsToBuffer int + NumRecordsToBuffer int firstSequenceNumber string lastSequenceNumber string - recordsInBuffer []Record + recordsInBuffer []interface{} sequencesInBuffer []string } // Adds a message to the buffer. -func (b *RecordBuffer) ProcessRecord(record Record, sequenceNumber string) { +func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) { if len(b.sequencesInBuffer) == 0 { b.firstSequenceNumber = sequenceNumber } @@ -27,7 +27,7 @@ func (b *RecordBuffer) ProcessRecord(record Record, sequenceNumber string) { } // Returns the records in the buffer. -func (b *RecordBuffer) Records() []Record { +func (b *RecordBuffer) Records() []interface{} { return b.recordsInBuffer } diff --git a/string_to_string_transformer.go b/string_to_string_transformer.go new file mode 100644 index 0000000..9288895 --- /dev/null +++ b/string_to_string_transformer.go @@ -0,0 +1,11 @@ +package connector + +type StringToStringTransformer struct{} + +func (t StringToStringTransformer) ToRecord(data []byte) interface{} { + return string(data) +} + +func (t StringToStringTransformer) FromRecord(s interface{}) []byte { + return []byte(s.(string)) +} diff --git a/transformer.go b/transformer.go index 956c312..26adce4 100644 --- a/transformer.go +++ b/transformer.go @@ -3,6 +3,6 @@ package connector // Transformer is used to transform data (byte array) to a Record for // processing in the application. type Transformer interface { - ToRecord(data []byte) Record - FromRecord(r Record) []byte + ToRecord(data []byte) interface{} + FromRecord(r interface{}) []byte }