Message buffer
* Create a structure for collecting unique messages. * Keep track of first checkpoint, and last checkpoint through sequence numbers from Kinesis. * Add specs for core functionality.
This commit is contained in:
parent
0fe80b708b
commit
b5625c98a1
3 changed files with 227 additions and 1 deletions
|
|
@ -11,4 +11,3 @@ production ready._
|
||||||
Clone the repository.
|
Clone the repository.
|
||||||
|
|
||||||
$ git clone git@github.com:harlow/go-etl.git
|
$ git clone git@github.com:harlow/go-etl.git
|
||||||
|
|
||||||
|
|
|
||||||
71
msg_buffer.go
Normal file
71
msg_buffer.go
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
package etl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MsgBuffer struct {
|
||||||
|
buffer bytes.Buffer
|
||||||
|
firstSequenceNumber string
|
||||||
|
lastSequenceNumber string
|
||||||
|
numMessagesToBuffer int
|
||||||
|
sequencesInBuffer []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b MsgBuffer) NumMessagesToBuffer() int {
|
||||||
|
return b.numMessagesToBuffer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MsgBuffer) ConsumeRecord(data []byte, sequenceNumber string) {
|
||||||
|
if len(b.sequencesInBuffer) == 0 {
|
||||||
|
b.firstSequenceNumber = sequenceNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
b.lastSequenceNumber = sequenceNumber
|
||||||
|
|
||||||
|
if !b.SequenceExists(sequenceNumber) {
|
||||||
|
b.buffer.Write(data)
|
||||||
|
b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b MsgBuffer) SequenceExists(sequenceNumber string) bool {
|
||||||
|
for _, v := range b.sequencesInBuffer {
|
||||||
|
if v == sequenceNumber {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b MsgBuffer) FileName() string {
|
||||||
|
date := time.Now().UTC().Format("2006-01-02")
|
||||||
|
return fmt.Sprintf("/%v/%v-%v.txt", date, b.firstSequenceNumber, b.lastSequenceNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b MsgBuffer) Data() []byte {
|
||||||
|
return b.buffer.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b MsgBuffer) NumMessagesInBuffer() int {
|
||||||
|
return len(b.sequencesInBuffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MsgBuffer) FlushBuffer() {
|
||||||
|
b.buffer.Reset()
|
||||||
|
b.sequencesInBuffer = b.sequencesInBuffer[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b MsgBuffer) ShouldFlush() bool {
|
||||||
|
return len(b.sequencesInBuffer) >= b.NumMessagesToBuffer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b MsgBuffer) LastSequenceNumber() string {
|
||||||
|
return b.lastSequenceNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b MsgBuffer) FirstSequenceNumber() string {
|
||||||
|
return b.firstSequenceNumber
|
||||||
|
}
|
||||||
156
msg_buffer_test.go
Normal file
156
msg_buffer_test.go
Normal file
|
|
@ -0,0 +1,156 @@
|
||||||
|
package etl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNumMessagesToBuffer(t *testing.T) {
|
||||||
|
const n = 25
|
||||||
|
b := MsgBuffer{numMessagesToBuffer: n}
|
||||||
|
r := b.NumMessagesToBuffer()
|
||||||
|
|
||||||
|
if r != n {
|
||||||
|
t.Errorf("NumMessagesToBuffer() = %v, want %v", r, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConsumeRecord(t *testing.T) {
|
||||||
|
var r1, s1 = []byte("Record1"), "Seq1"
|
||||||
|
var r2, s2 = []byte("Recrod2"), "Seq2"
|
||||||
|
|
||||||
|
b := MsgBuffer{}
|
||||||
|
b.ConsumeRecord(r1, s1)
|
||||||
|
|
||||||
|
if b.NumMessagesInBuffer() != 1 {
|
||||||
|
t.Errorf("NumRecordsInBuffer() want %v", 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ConsumeRecord(r2, s2)
|
||||||
|
|
||||||
|
if b.NumMessagesInBuffer() != 2 {
|
||||||
|
t.Errorf("NumMessagesInBuffer() want %v", 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ConsumeRecord(r2, s2)
|
||||||
|
|
||||||
|
if b.NumMessagesInBuffer() != 2 {
|
||||||
|
t.Errorf("NumMessagesInBuffer() want %v", 2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSequenceExists(t *testing.T) {
|
||||||
|
var r1, s1 = []byte("Record1"), "Seq1"
|
||||||
|
var r2, s2 = []byte("Recrod2"), "Seq2"
|
||||||
|
|
||||||
|
b := MsgBuffer{}
|
||||||
|
b.ConsumeRecord(r1, s1)
|
||||||
|
|
||||||
|
if b.SequenceExists(s1) != true {
|
||||||
|
t.Errorf("SequenceExists() want %v", true)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ConsumeRecord(r2, s2)
|
||||||
|
|
||||||
|
if b.SequenceExists(s2) != true {
|
||||||
|
t.Errorf("SequenceExists() want %v", true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFlushBuffer(t *testing.T) {
|
||||||
|
var r1, s1 = []byte("Record"), "SeqNum"
|
||||||
|
b := MsgBuffer{}
|
||||||
|
b.ConsumeRecord(r1, s1)
|
||||||
|
|
||||||
|
b.FlushBuffer()
|
||||||
|
|
||||||
|
if b.NumMessagesInBuffer() != 0 {
|
||||||
|
t.Errorf("NumMessagesInBuffer() want %v", 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLastSequenceNumber(t *testing.T) {
|
||||||
|
var r1, s1 = []byte("Record1"), "Seq1"
|
||||||
|
var r2, s2 = []byte("Recrod2"), "Seq2"
|
||||||
|
|
||||||
|
b := MsgBuffer{}
|
||||||
|
b.ConsumeRecord(r1, s1)
|
||||||
|
|
||||||
|
if b.LastSequenceNumber() != s1 {
|
||||||
|
t.Errorf("LastSequenceNumber() want %v", s1)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ConsumeRecord(r2, s2)
|
||||||
|
|
||||||
|
if b.LastSequenceNumber() != s2 {
|
||||||
|
t.Errorf("LastSequenceNumber() want %v", s2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFirstSequenceNumber(t *testing.T) {
|
||||||
|
var r1, s1 = []byte("Record1"), "Seq1"
|
||||||
|
var r2, s2 = []byte("Recrod2"), "Seq2"
|
||||||
|
|
||||||
|
b := MsgBuffer{}
|
||||||
|
b.ConsumeRecord(r1, s1)
|
||||||
|
|
||||||
|
if b.FirstSequenceNumber() != s1 {
|
||||||
|
t.Errorf("FirstSequenceNumber() want %v", s1)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ConsumeRecord(r2, s2)
|
||||||
|
|
||||||
|
if b.FirstSequenceNumber() != s1 {
|
||||||
|
t.Errorf("FirstSequenceNumber() want %v", s1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestShouldFlush(t *testing.T) {
|
||||||
|
const n = 2
|
||||||
|
var r1, s1 = []byte("Record1"), "Seq1"
|
||||||
|
var r2, s2 = []byte("Recrod2"), "Seq2"
|
||||||
|
|
||||||
|
b := MsgBuffer{numMessagesToBuffer: n}
|
||||||
|
b.ConsumeRecord(r1, s1)
|
||||||
|
|
||||||
|
if b.ShouldFlush() != false {
|
||||||
|
t.Errorf("ShouldFlush() want %v", false)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ConsumeRecord(r2, s2)
|
||||||
|
|
||||||
|
if b.ShouldFlush() != true {
|
||||||
|
t.Errorf("ShouldFlush() want %v", true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestData(t *testing.T) {
|
||||||
|
var r1, s1 = []byte("Record1\n"), "Seq1"
|
||||||
|
var r2, s2 = []byte("Record2\n"), "Seq2"
|
||||||
|
var body = []byte("Record1\nRecord2\n")
|
||||||
|
|
||||||
|
b := MsgBuffer{}
|
||||||
|
b.ConsumeRecord(r1, s1)
|
||||||
|
b.ConsumeRecord(r2, s2)
|
||||||
|
|
||||||
|
if !bytes.Equal(b.Data(), body) {
|
||||||
|
t.Errorf("Data() want %v", body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFileName(t *testing.T) {
|
||||||
|
var r1, s1 = []byte("Record1"), "Seq1"
|
||||||
|
var r2, s2 = []byte("Record2"), "Seq2"
|
||||||
|
date := time.Now().UTC().Format("2006-01-02")
|
||||||
|
name := fmt.Sprintf("/%v/Seq1-Seq2.txt", date)
|
||||||
|
|
||||||
|
b := MsgBuffer{}
|
||||||
|
b.ConsumeRecord(r1, s1)
|
||||||
|
b.ConsumeRecord(r2, s2)
|
||||||
|
|
||||||
|
if b.FileName() != name {
|
||||||
|
t.Errorf("FileName() = want %v", b.FileName(), name)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue