diff --git a/README.md b/README.md index 6f1bd52..4dc3981 100644 --- a/README.md +++ b/README.md @@ -20,22 +20,42 @@ func main() { ) flag.Parse() + // override library defaults + cfg := connector.Config{ + MaxBatchCount: 400, + } + // create new consumer - c := connector.NewConsumer(*app, *stream) + c := connector.NewConsumer(*app, *stream, cfg) - // override default values - c.Set("maxRecordCount", 200) - - // start consuming records from the queues + // process records from the stream c.Start(connector.HandlerFunc(func(b connector.Buffer) { fmt.Println(b.GetRecords()) - // process the records })) select {} } ``` +### Logging + +[Apex Log](https://medium.com/@tjholowaychuk/apex-log-e8d9627f4a9a#.5x1uo1767) is used to log Info and Errors from within the libarary. The default handler is "text" and can be overrideen with other [LogHandlers](https://github.com/apex/log/tree/master/_examples) from the the Config struct: + +```go +import( + "github.com/apex/log" + "github.com/apex/log/handlers/json" +) + +func main() { + // ... + + cfg := connector.Config{ + LogHandler: json.New(os.Stderr), + } +} +``` + ### Installation Get the package source: @@ -60,24 +80,6 @@ Use the [seed stream](https://github.com/harlow/kinesis-connectors/tree/master/e * [Firehose](https://github.com/harlow/kinesis-connectors/tree/master/examples/firehose) * [S3](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3) -### Logging - -Default logging is handled by [go-kit package log](https://github.com/go-kit/kit/tree/master/log). Applications can override the default loging behaviour by implementing the [Logger interface][log_interface]. - -```go -import( - "os" - - "github.com/apex/log" - "github.com/apex/log/handlers/json" -) - -func main() { - c := connector.NewConsumer("signupAgg", "signups") - c.SetLogHandler(json.New(os.Stderr)) - // ... -} -``` ## Contributing diff --git a/buffer.go b/buffer.go index 202a022..0b95a44 100644 --- a/buffer.go +++ b/buffer.go @@ -9,7 +9,7 @@ type Buffer struct { firstSequenceNumber string lastSequenceNumber string - MaxRecordCount int + MaxBatchCount int } // AddRecord adds a record to the buffer. @@ -24,7 +24,7 @@ func (b *Buffer) AddRecord(r *kinesis.Record) { // ShouldFlush determines if the buffer has reached its target size. func (b *Buffer) ShouldFlush() bool { - return b.RecordCount() >= b.MaxRecordCount + return b.RecordCount() >= b.MaxBatchCount } // Flush empties the buffer and resets the sequence counter. diff --git a/buffer_test.go b/buffer_test.go index 6870d21..10c3c6e 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -8,7 +8,7 @@ import ( ) func BenchmarkBufferLifecycle(b *testing.B) { - buf := Buffer{MaxRecordCount: 1000} + buf := Buffer{MaxBatchCount: 1000} seq := "1" rec := &kinesis.Record{SequenceNumber: &seq} @@ -48,7 +48,7 @@ func Test_LastSeq(t *testing.T) { } func Test_ShouldFlush(t *testing.T) { - b := Buffer{MaxRecordCount: 2} + b := Buffer{MaxBatchCount: 2} s1, s2 := "1", "2" r1 := &kinesis.Record{SequenceNumber: &s1} r2 := &kinesis.Record{SequenceNumber: &s2} diff --git a/config.go b/config.go new file mode 100644 index 0000000..12dac97 --- /dev/null +++ b/config.go @@ -0,0 +1,10 @@ +package connector + +import ( + "github.com/apex/log" +) + +type Config struct { + MaxBatchCount int + LogHandler log.Handler +} diff --git a/consumer.go b/consumer.go index cb26989..cb720a8 100644 --- a/consumer.go +++ b/consumer.go @@ -10,15 +10,19 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" ) -var ( - maxRecordCount = 1000 - maxBufferTime = "30s" +const ( + defaultMaxBatchCount = 1000 ) -// NewConsumer creates a new kinesis connection and returns a -// new consumer initialized with app and stream name -func NewConsumer(appName, streamName string) *Consumer { - log.SetHandler(text.New(os.Stderr)) +// NewConsumer creates a new consumer with initialied kinesis connection +func NewConsumer(appName, streamName string, cfg Config) *Consumer { + if cfg.LogHandler == nil { + cfg.LogHandler = text.New(os.Stderr) + } + + if cfg.MaxBatchCount == 0 { + cfg.MaxBatchCount = defaultMaxBatchCount + } svc := kinesis.New( session.New( @@ -30,6 +34,7 @@ func NewConsumer(appName, streamName string) *Consumer { appName: appName, streamName: streamName, svc: svc, + cfg: cfg, } } @@ -37,27 +42,14 @@ type Consumer struct { appName string streamName string svc *kinesis.Kinesis -} - -// Set `option` to `value` -func (c *Consumer) Set(option string, value interface{}) { - switch option { - case "maxRecordCount": - maxRecordCount = value.(int) - default: - log.Error("invalid option") - os.Exit(1) - } -} - -// SetLogHandler allows users override logger -func (c *Consumer) SetLogHandler(handler log.Handler) { - log.SetHandler(handler) + cfg Config } // Start takes a handler and then loops over each of the shards // processing each one with the handler. func (c *Consumer) Start(handler Handler) { + log.SetHandler(c.cfg.LogHandler) + resp, err := c.svc.DescribeStream( &kinesis.DescribeStreamInput{ StreamName: aws.String(c.streamName), @@ -82,7 +74,7 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { }) buf := &Buffer{ - MaxRecordCount: maxRecordCount, + MaxBatchCount: c.cfg.MaxBatchCount, } checkpoint := &Checkpoint{ diff --git a/consumer_test.go b/consumer_test.go deleted file mode 100644 index 69d119e..0000000 --- a/consumer_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package connector - -import ( - "testing" - - "github.com/bmizerany/assert" -) - -func Test_Set(t *testing.T) { - defaultMaxRecordCount := 1000 - assert.Equal(t, maxRecordCount, defaultMaxRecordCount) - - c := NewConsumer("app", "stream") - c.Set("maxRecordCount", 100) - - assert.Equal(t, maxRecordCount, 100) -} diff --git a/examples/firehose/main.go b/examples/firehose/main.go index 7909837..fc40f60 100644 --- a/examples/firehose/main.go +++ b/examples/firehose/main.go @@ -31,8 +31,12 @@ func main() { flag.Parse() svc := firehose.New(session.New()) - c := connector.NewConsumer(*app, *stream) - c.Set("maxRecordCount", 400) + cfg := connector.Config{ + MaxBatchCount: 400, + } + + c := connector.NewConsumer(*app, *stream, cfg) + c.Start(connector.HandlerFunc(func(b connector.Buffer) { params := &firehose.PutRecordBatchInput{ DeliveryStreamName: aws.String(*delivery), diff --git a/examples/s3/main.go b/examples/s3/main.go index ccba741..49a6571 100644 --- a/examples/s3/main.go +++ b/examples/s3/main.go @@ -24,7 +24,11 @@ func main() { Region: "us-west-1", } - c := connector.NewConsumer(*app, *stream) + cfg := connector.Config{ + MaxBatchCount: 500, + } + + c := connector.NewConsumer(*app, *stream, cfg) c.Start(connector.HandlerFunc(func(b connector.Buffer) { body := new(bytes.Buffer)