From 3aa0f72efeb7c0aeb3cd1951deb19931da9f1003 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 1 May 2016 10:43:42 -0700 Subject: [PATCH] Add logging when records are emitted w/ record count --- README.md | 2 +- buffer.go | 11 ++++++++--- consumer.go | 12 +++++++----- consumer_test.go | 8 ++++---- examples/firehose/main.go | 3 +-- 5 files changed, 21 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 05ee7d9..6f1bd52 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ func main() { c := connector.NewConsumer(*app, *stream) // override default values - c.Set("maxBatchCount", 200) + c.Set("maxRecordCount", 200) // start consuming records from the queues c.Start(connector.HandlerFunc(func(b connector.Buffer) { diff --git a/buffer.go b/buffer.go index 819d863..202a022 100644 --- a/buffer.go +++ b/buffer.go @@ -9,12 +9,12 @@ type Buffer struct { firstSequenceNumber string lastSequenceNumber string - MaxBatchCount int + MaxRecordCount int } // AddRecord adds a record to the buffer. func (b *Buffer) AddRecord(r *kinesis.Record) { - if len(b.records) == 0 { + if b.RecordCount() == 0 { b.firstSequenceNumber = *r.SequenceNumber } @@ -24,7 +24,7 @@ func (b *Buffer) AddRecord(r *kinesis.Record) { // ShouldFlush determines if the buffer has reached its target size. func (b *Buffer) ShouldFlush() bool { - return len(b.records) >= b.MaxBatchCount + return b.RecordCount() >= b.MaxRecordCount } // Flush empties the buffer and resets the sequence counter. @@ -37,6 +37,11 @@ func (b *Buffer) GetRecords() []*kinesis.Record { return b.records } +// RecordCount returns the number of records in the buffer. +func (b *Buffer) RecordCount() int { + return len(b.records) +} + // FirstSequenceNumber returns the sequence number of the first record in the buffer. func (b *Buffer) FirstSeq() string { return b.firstSequenceNumber diff --git a/consumer.go b/consumer.go index 6fb8ec4..cb26989 100644 --- a/consumer.go +++ b/consumer.go @@ -11,7 +11,8 @@ import ( ) var ( - maxBatchCount = 1000 + maxRecordCount = 1000 + maxBufferTime = "30s" ) // NewConsumer creates a new kinesis connection and returns a @@ -41,8 +42,8 @@ type Consumer struct { // Set `option` to `value` func (c *Consumer) Set(option string, value interface{}) { switch option { - case "maxBatchCount": - maxBatchCount = value.(int) + case "maxRecordCount": + maxRecordCount = value.(int) default: log.Error("invalid option") os.Exit(1) @@ -81,7 +82,7 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { }) buf := &Buffer{ - MaxBatchCount: maxBatchCount, + MaxRecordCount: maxRecordCount, } checkpoint := &Checkpoint{ @@ -108,7 +109,7 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { } shardIterator := resp.ShardIterator - ctx.Info("started") + ctx.Info("processing") for { resp, err := c.svc.GetRecords( @@ -128,6 +129,7 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { if buf.ShouldFlush() { handler.HandleRecords(*buf) + ctx.WithField("count", buf.RecordCount()).Info("emitted") checkpoint.SetCheckpoint(shardID, buf.LastSeq()) buf.Flush() } diff --git a/consumer_test.go b/consumer_test.go index 84ac83c..69d119e 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -7,11 +7,11 @@ import ( ) func Test_Set(t *testing.T) { - defaultMaxBatchCount := 1000 - assert.Equal(t, maxBatchCount, defaultMaxBatchCount) + defaultMaxRecordCount := 1000 + assert.Equal(t, maxRecordCount, defaultMaxRecordCount) c := NewConsumer("app", "stream") - c.Set("maxBatchCount", 100) + c.Set("maxRecordCount", 100) - assert.Equal(t, maxBatchCount, 100) + assert.Equal(t, maxRecordCount, 100) } diff --git a/examples/firehose/main.go b/examples/firehose/main.go index b16cbd9..7909837 100644 --- a/examples/firehose/main.go +++ b/examples/firehose/main.go @@ -32,8 +32,7 @@ func main() { svc := firehose.New(session.New()) c := connector.NewConsumer(*app, *stream) - c.Set("maxBatchCount", 400) - c.Set("pollInterval", "3s") + c.Set("maxRecordCount", 400) c.Start(connector.HandlerFunc(func(b connector.Buffer) { params := &firehose.PutRecordBatchInput{ DeliveryStreamName: aws.String(*delivery),