2014-07-25 06:03:41 +00:00
|
|
|
package connector
|
|
|
|
|
|
|
|
|
|
// This struct is a basic implementation of the Buffer interface. It is a wrapper on a buffer of
|
|
|
|
|
// records that are periodically flushed. It is configured with an implementation of Filter that
|
|
|
|
|
// decides whether a record will be added to the buffer to be emitted.
|
|
|
|
|
type RecordBuffer struct {
|
|
|
|
|
NumRecordsToBuffer int
|
2014-11-16 00:25:27 +00:00
|
|
|
|
2014-07-25 06:03:41 +00:00
|
|
|
firstSequenceNumber string
|
|
|
|
|
lastSequenceNumber string
|
2014-11-15 23:54:54 +00:00
|
|
|
recordsInBuffer []Record
|
2014-07-25 06:03:41 +00:00
|
|
|
sequencesInBuffer []string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Adds a message to the buffer.
|
2014-11-15 23:54:54 +00:00
|
|
|
func (b *RecordBuffer) ProcessRecord(record Record, sequenceNumber string) {
|
2014-07-25 06:03:41 +00:00
|
|
|
if len(b.sequencesInBuffer) == 0 {
|
|
|
|
|
b.firstSequenceNumber = sequenceNumber
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b.lastSequenceNumber = sequenceNumber
|
|
|
|
|
|
|
|
|
|
if !b.sequenceExists(sequenceNumber) {
|
|
|
|
|
b.recordsInBuffer = append(b.recordsInBuffer, record)
|
|
|
|
|
b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Returns the records in the buffer.
|
2014-11-15 23:54:54 +00:00
|
|
|
func (b *RecordBuffer) Records() []Record {
|
2014-07-25 06:03:41 +00:00
|
|
|
return b.recordsInBuffer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Returns the number of messages in the buffer.
|
|
|
|
|
func (b RecordBuffer) NumRecordsInBuffer() int {
|
|
|
|
|
return len(b.sequencesInBuffer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Flushes the content in the buffer and resets the sequence counter.
|
|
|
|
|
func (b *RecordBuffer) Flush() {
|
|
|
|
|
b.recordsInBuffer = b.recordsInBuffer[:0]
|
|
|
|
|
b.sequencesInBuffer = b.sequencesInBuffer[:0]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Checks if the sequence already exists in the buffer.
|
|
|
|
|
func (b *RecordBuffer) sequenceExists(sequenceNumber string) bool {
|
|
|
|
|
for _, v := range b.sequencesInBuffer {
|
|
|
|
|
if v == sequenceNumber {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Determines if the buffer has reached its target size.
|
|
|
|
|
func (b *RecordBuffer) ShouldFlush() bool {
|
|
|
|
|
return len(b.sequencesInBuffer) >= b.NumRecordsToBuffer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Returns the sequence number of the first message in the buffer.
|
|
|
|
|
func (b *RecordBuffer) FirstSequenceNumber() string {
|
|
|
|
|
return b.firstSequenceNumber
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Returns the sequence number of the last message in the buffer.
|
|
|
|
|
func (b *RecordBuffer) LastSequenceNumber() string {
|
|
|
|
|
return b.lastSequenceNumber
|
|
|
|
|
}
|