diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 4bac1e8..7d6eae5 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -18,12 +18,12 @@ type Config struct { // LogFile where consumer errors and failed log lines are saved LogFile string - // FlushInterval is how often accumulated messages should be bulk put to firehose - FlushInterval time.Duration - // FlushCount is the number of messages that triggers a push to firehose. Max batch size is 500, see: http://docs.aws.amazon.com/firehose/latest/dev/limits.html - FlushCount int - // FlushSize is the size of a batch in bytes that triggers a push to firehose. Max batch size is 4Mb (4*1024*1024), see: http://docs.aws.amazon.com/firehose/latest/dev/limits.html - FlushSize int + // BatchInterval the upper bound on how often SendBatch is called with accumulated messages + BatchInterval time.Duration + // BatchCount is the number of messages that triggers a SendBatch call + BatchCount int + // BatchSize is the size of a batch in bytes that triggers a SendBatch call + BatchSize int // ReadRateLimit the number of records read per seconds ReadRateLimit int @@ -49,14 +49,14 @@ func withDefaults(config Config) Config { config.LogFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) } - if config.FlushInterval == 0 { - config.FlushInterval = 10 * time.Second + if config.BatchInterval == 0 { + config.BatchInterval = 10 * time.Second } - if config.FlushCount == 0 { - config.FlushCount = 500 + if config.BatchCount == 0 { + config.BatchCount = 500 } - if config.FlushSize == 0 { - config.FlushSize = 4 * 1024 * 1024 + if config.BatchSize == 0 { + config.BatchSize = 4 * 1024 * 1024 } if config.DeployEnv == "" { diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 6e3a3a7..f78d217 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -118,7 +118,7 @@ func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { tag: tag, writer: b, } - return batcher.New(sync, b.config.FlushInterval, b.config.FlushCount, b.config.FlushSize) + return batcher.New(sync, b.config.BatchInterval, b.config.BatchCount, b.config.BatchSize) } func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index 5d35924..f97e574 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -25,9 +25,9 @@ func createDummyOutput() (logger.KayveeLogger, *os.File) { func main() { config := kbc.Config{ - FlushInterval: 10 * time.Second, - FlushCount: 500, - FlushSize: 4 * 1024 * 1024, // 4Mb + BatchInterval: 10 * time.Second, + BatchCount: 500, + BatchSize: 4 * 1024 * 1024, // 4Mb LogFile: "/tmp/example-kcl-consumer", DeployEnv: "test-env", }