diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go index 1cf93e5..fbf25ba 100644 --- a/batchconsumer/batcher/message_batcher.go +++ b/batchconsumer/batcher/message_batcher.go @@ -76,8 +76,18 @@ type batcher struct { // - flushInterval - how often accumulated messages should be flushed (default 1 second). // - flushCount - number of messages that trigger a flush (default 10). // - flushSize - size of batch that triggers a flush (default 1024 * 1024 = 1 mb) -func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) Batcher { - msgChan := make(chan msgPack, 100) +func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) (Batcher, error) { + if flushSize == 0 { + return nil, fmt.Errorf("flush size must be non-zero") + } + if flushCount == 0 { + return nil, fmt.Errorf("flush count must be non-zero") + } + if flushInterval == 0 { + return nil, fmt.Errorf("flush interval must be non-zero") + } + + msgChan := make(chan msgPack) flushChan := make(chan struct{}) b := &batcher{ @@ -91,7 +101,7 @@ func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) go b.startBatcher(msgChan, flushChan) - return b + return b, nil } func (b *batcher) SmallestSequencePair() SequencePair { diff --git a/batchconsumer/batcher/message_batcher_test.go b/batchconsumer/batcher/message_batcher_test.go index 23f24f9..8ab9039 100644 --- a/batchconsumer/batcher/message_batcher_test.go +++ b/batchconsumer/batcher/message_batcher_test.go @@ -40,11 +40,11 @@ func (m *MockSync) waitForFlush(timeout time.Duration) error { var mockSequence = SequencePair{big.NewInt(99999), 12345} func TestBatchingByCount(t *testing.T) { - var err error assert := assert.New(t) sync := NewMockSync() - batcher := New(sync, time.Hour, 2, 1024*1024) + batcher, err := New(sync, time.Hour, 2, 1024*1024) + assert.NoError(err) t.Log("Batcher respect count limit") assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) @@ -65,11 +65,11 @@ func TestBatchingByCount(t *testing.T) { } func TestBatchingByTime(t *testing.T) { - var err error assert := assert.New(t) sync := NewMockSync() - batcher := New(sync, time.Millisecond, 2000000, 1024*1024) + batcher, err := New(sync, time.Millisecond, 2000000, 1024*1024) + assert.NoError(err) t.Log("Batcher sends partial batches when time expires") assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) @@ -99,11 +99,11 @@ func TestBatchingByTime(t *testing.T) { } func TestBatchingBySize(t *testing.T) { - var err error assert := assert.New(t) sync := NewMockSync() - batcher := New(sync, time.Hour, 2000000, 8) + batcher, err := New(sync, time.Hour, 2000000, 8) + assert.NoError(err) t.Log("Large messages are sent immediately") assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequence)) @@ -145,11 +145,11 @@ func TestBatchingBySize(t *testing.T) { } func TestFlushing(t *testing.T) { - var err error assert := assert.New(t) sync := NewMockSync() - batcher := New(sync, time.Hour, 2000000, 1024*1024) + batcher, err := New(sync, time.Hour, 2000000, 1024*1024) + assert.NoError(err) t.Log("Calling flush sends pending messages") assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) @@ -168,11 +168,11 @@ func TestFlushing(t *testing.T) { } func TestSendingEmpty(t *testing.T) { - var err error assert := assert.New(t) sync := NewMockSync() - batcher := New(sync, time.Second, 10, 1024*1024) + batcher, err := New(sync, time.Second, 10, 1024*1024) + assert.NoError(err) t.Log("An error is returned when an empty message is sent") err = batcher.AddMessage([]byte{}, mockSequence) @@ -184,7 +184,10 @@ func TestUpdatingSequence(t *testing.T) { assert := assert.New(t) sync := NewMockSync() - batcher := New(sync, time.Second, 10, 1024*1024).(*batcher) + b, err := New(sync, time.Second, 10, 1024*1024) + assert.NoError(err) + + batcher := b.(*batcher) t.Log("Initally, smallestSeq is undefined") assert.Nil(batcher.SmallestSequencePair().Sequence) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index f78d217..a3100cd 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -118,7 +118,12 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { tag: tag, writer: b, } - return batcher.New(sync, b.config.BatchInterval, b.config.BatchCount, b.config.BatchSize) + batch, err := batcher.New(sync, b.config.BatchInterval, b.config.BatchCount, b.config.BatchSize) + if err != nil { + b.log.ErrorD("create-batcher", kv.M{"msg": err.Error(), "tag": tag}) + } + + return batch } func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {