From b5625c98a1f0b120bbed1d69a8e4c9e06c764281 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Thu, 24 Jul 2014 23:03:41 -0700 Subject: [PATCH] Message buffer * Create a structure for collecting unique messages. * Keep track of first checkpoint, and last checkpoint through sequence numbers from Kinesis. * Add specs for core functionality. --- README.md | 1 - msg_buffer.go | 71 +++++++++++++++++++++ msg_buffer_test.go | 156 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 msg_buffer.go create mode 100644 msg_buffer_test.go diff --git a/README.md b/README.md index 1a56969..d9944a6 100644 --- a/README.md +++ b/README.md @@ -11,4 +11,3 @@ production ready._ Clone the repository. $ git clone git@github.com:harlow/go-etl.git - diff --git a/msg_buffer.go b/msg_buffer.go new file mode 100644 index 0000000..99809a3 --- /dev/null +++ b/msg_buffer.go @@ -0,0 +1,71 @@ +package etl + +import ( + "bytes" + "fmt" + "time" +) + +type MsgBuffer struct { + buffer bytes.Buffer + firstSequenceNumber string + lastSequenceNumber string + numMessagesToBuffer int + sequencesInBuffer []string +} + +func (b MsgBuffer) NumMessagesToBuffer() int { + return b.numMessagesToBuffer +} + +func (b *MsgBuffer) ConsumeRecord(data []byte, sequenceNumber string) { + if len(b.sequencesInBuffer) == 0 { + b.firstSequenceNumber = sequenceNumber + } + + b.lastSequenceNumber = sequenceNumber + + if !b.SequenceExists(sequenceNumber) { + b.buffer.Write(data) + b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber) + } +} + +func (b MsgBuffer) SequenceExists(sequenceNumber string) bool { + for _, v := range b.sequencesInBuffer { + if v == sequenceNumber { + return true + } + } + return false +} + +func (b MsgBuffer) FileName() string { + date := time.Now().UTC().Format("2006-01-02") + return fmt.Sprintf("/%v/%v-%v.txt", date, b.firstSequenceNumber, b.lastSequenceNumber) +} + +func (b MsgBuffer) Data() []byte { + return b.buffer.Bytes() +} + +func (b MsgBuffer) NumMessagesInBuffer() int { + return len(b.sequencesInBuffer) +} + +func (b *MsgBuffer) FlushBuffer() { + b.buffer.Reset() + b.sequencesInBuffer = b.sequencesInBuffer[:0] +} + +func (b MsgBuffer) ShouldFlush() bool { + return len(b.sequencesInBuffer) >= b.NumMessagesToBuffer() +} + +func (b MsgBuffer) LastSequenceNumber() string { + return b.lastSequenceNumber +} + +func (b MsgBuffer) FirstSequenceNumber() string { + return b.firstSequenceNumber +} diff --git a/msg_buffer_test.go b/msg_buffer_test.go new file mode 100644 index 0000000..856a21f --- /dev/null +++ b/msg_buffer_test.go @@ -0,0 +1,156 @@ +package etl + +import ( + "bytes" + "fmt" + "testing" + "time" +) + +func TestNumMessagesToBuffer(t *testing.T) { + const n = 25 + b := MsgBuffer{numMessagesToBuffer: n} + r := b.NumMessagesToBuffer() + + if r != n { + t.Errorf("NumMessagesToBuffer() = %v, want %v", r, n) + } +} + +func TestConsumeRecord(t *testing.T) { + var r1, s1 = []byte("Record1"), "Seq1" + var r2, s2 = []byte("Recrod2"), "Seq2" + + b := MsgBuffer{} + b.ConsumeRecord(r1, s1) + + if b.NumMessagesInBuffer() != 1 { + t.Errorf("NumRecordsInBuffer() want %v", 1) + } + + b.ConsumeRecord(r2, s2) + + if b.NumMessagesInBuffer() != 2 { + t.Errorf("NumMessagesInBuffer() want %v", 2) + } + + b.ConsumeRecord(r2, s2) + + if b.NumMessagesInBuffer() != 2 { + t.Errorf("NumMessagesInBuffer() want %v", 2) + } +} + +func TestSequenceExists(t *testing.T) { + var r1, s1 = []byte("Record1"), "Seq1" + var r2, s2 = []byte("Recrod2"), "Seq2" + + b := MsgBuffer{} + b.ConsumeRecord(r1, s1) + + if b.SequenceExists(s1) != true { + t.Errorf("SequenceExists() want %v", true) + } + + b.ConsumeRecord(r2, s2) + + if b.SequenceExists(s2) != true { + t.Errorf("SequenceExists() want %v", true) + } +} + +func TestFlushBuffer(t *testing.T) { + var r1, s1 = []byte("Record"), "SeqNum" + b := MsgBuffer{} + b.ConsumeRecord(r1, s1) + + b.FlushBuffer() + + if b.NumMessagesInBuffer() != 0 { + t.Errorf("NumMessagesInBuffer() want %v", 0) + } +} + +func TestLastSequenceNumber(t *testing.T) { + var r1, s1 = []byte("Record1"), "Seq1" + var r2, s2 = []byte("Recrod2"), "Seq2" + + b := MsgBuffer{} + b.ConsumeRecord(r1, s1) + + if b.LastSequenceNumber() != s1 { + t.Errorf("LastSequenceNumber() want %v", s1) + } + + b.ConsumeRecord(r2, s2) + + if b.LastSequenceNumber() != s2 { + t.Errorf("LastSequenceNumber() want %v", s2) + } +} + +func TestFirstSequenceNumber(t *testing.T) { + var r1, s1 = []byte("Record1"), "Seq1" + var r2, s2 = []byte("Recrod2"), "Seq2" + + b := MsgBuffer{} + b.ConsumeRecord(r1, s1) + + if b.FirstSequenceNumber() != s1 { + t.Errorf("FirstSequenceNumber() want %v", s1) + } + + b.ConsumeRecord(r2, s2) + + if b.FirstSequenceNumber() != s1 { + t.Errorf("FirstSequenceNumber() want %v", s1) + } +} + +func TestShouldFlush(t *testing.T) { + const n = 2 + var r1, s1 = []byte("Record1"), "Seq1" + var r2, s2 = []byte("Recrod2"), "Seq2" + + b := MsgBuffer{numMessagesToBuffer: n} + b.ConsumeRecord(r1, s1) + + if b.ShouldFlush() != false { + t.Errorf("ShouldFlush() want %v", false) + } + + b.ConsumeRecord(r2, s2) + + if b.ShouldFlush() != true { + t.Errorf("ShouldFlush() want %v", true) + } +} + +func TestData(t *testing.T) { + var r1, s1 = []byte("Record1\n"), "Seq1" + var r2, s2 = []byte("Record2\n"), "Seq2" + var body = []byte("Record1\nRecord2\n") + + b := MsgBuffer{} + b.ConsumeRecord(r1, s1) + b.ConsumeRecord(r2, s2) + + if !bytes.Equal(b.Data(), body) { + t.Errorf("Data() want %v", body) + } +} + +func TestFileName(t *testing.T) { + var r1, s1 = []byte("Record1"), "Seq1" + var r2, s2 = []byte("Record2"), "Seq2" + date := time.Now().UTC().Format("2006-01-02") + name := fmt.Sprintf("/%v/Seq1-Seq2.txt", date) + + b := MsgBuffer{} + b.ConsumeRecord(r1, s1) + b.ConsumeRecord(r2, s2) + + if b.FileName() != name { + t.Errorf("FileName() = want %v", b.FileName(), name) + } +}