Use config object for optional params
After reading notes from Peter's talk I like the idea of using a config object where consumers of the library can override the defaults. https://peter.bourgon.org/go-best-practices-2016/#configuration
This commit is contained in:
parent
3aa0f72efe
commit
afae1bea36
8 changed files with 67 additions and 72 deletions
50
README.md
50
README.md
|
|
@ -20,22 +20,42 @@ func main() {
|
||||||
)
|
)
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
// override library defaults
|
||||||
|
cfg := connector.Config{
|
||||||
|
MaxBatchCount: 400,
|
||||||
|
}
|
||||||
|
|
||||||
// create new consumer
|
// create new consumer
|
||||||
c := connector.NewConsumer(*app, *stream)
|
c := connector.NewConsumer(*app, *stream, cfg)
|
||||||
|
|
||||||
// override default values
|
// process records from the stream
|
||||||
c.Set("maxRecordCount", 200)
|
|
||||||
|
|
||||||
// start consuming records from the queues
|
|
||||||
c.Start(connector.HandlerFunc(func(b connector.Buffer) {
|
c.Start(connector.HandlerFunc(func(b connector.Buffer) {
|
||||||
fmt.Println(b.GetRecords())
|
fmt.Println(b.GetRecords())
|
||||||
// process the records
|
|
||||||
}))
|
}))
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Logging
|
||||||
|
|
||||||
|
[Apex Log](https://medium.com/@tjholowaychuk/apex-log-e8d9627f4a9a#.5x1uo1767) is used to log Info and Errors from within the libarary. The default handler is "text" and can be overrideen with other [LogHandlers](https://github.com/apex/log/tree/master/_examples) from the the Config struct:
|
||||||
|
|
||||||
|
```go
|
||||||
|
import(
|
||||||
|
"github.com/apex/log"
|
||||||
|
"github.com/apex/log/handlers/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// ...
|
||||||
|
|
||||||
|
cfg := connector.Config{
|
||||||
|
LogHandler: json.New(os.Stderr),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### Installation
|
### Installation
|
||||||
|
|
||||||
Get the package source:
|
Get the package source:
|
||||||
|
|
@ -60,24 +80,6 @@ Use the [seed stream](https://github.com/harlow/kinesis-connectors/tree/master/e
|
||||||
* [Firehose](https://github.com/harlow/kinesis-connectors/tree/master/examples/firehose)
|
* [Firehose](https://github.com/harlow/kinesis-connectors/tree/master/examples/firehose)
|
||||||
* [S3](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3)
|
* [S3](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3)
|
||||||
|
|
||||||
### Logging
|
|
||||||
|
|
||||||
Default logging is handled by [go-kit package log](https://github.com/go-kit/kit/tree/master/log). Applications can override the default loging behaviour by implementing the [Logger interface][log_interface].
|
|
||||||
|
|
||||||
```go
|
|
||||||
import(
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/apex/log"
|
|
||||||
"github.com/apex/log/handlers/json"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
c := connector.NewConsumer("signupAgg", "signups")
|
|
||||||
c.SetLogHandler(json.New(os.Stderr))
|
|
||||||
// ...
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ type Buffer struct {
|
||||||
firstSequenceNumber string
|
firstSequenceNumber string
|
||||||
lastSequenceNumber string
|
lastSequenceNumber string
|
||||||
|
|
||||||
MaxRecordCount int
|
MaxBatchCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRecord adds a record to the buffer.
|
// AddRecord adds a record to the buffer.
|
||||||
|
|
@ -24,7 +24,7 @@ func (b *Buffer) AddRecord(r *kinesis.Record) {
|
||||||
|
|
||||||
// ShouldFlush determines if the buffer has reached its target size.
|
// ShouldFlush determines if the buffer has reached its target size.
|
||||||
func (b *Buffer) ShouldFlush() bool {
|
func (b *Buffer) ShouldFlush() bool {
|
||||||
return b.RecordCount() >= b.MaxRecordCount
|
return b.RecordCount() >= b.MaxBatchCount
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush empties the buffer and resets the sequence counter.
|
// Flush empties the buffer and resets the sequence counter.
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkBufferLifecycle(b *testing.B) {
|
func BenchmarkBufferLifecycle(b *testing.B) {
|
||||||
buf := Buffer{MaxRecordCount: 1000}
|
buf := Buffer{MaxBatchCount: 1000}
|
||||||
seq := "1"
|
seq := "1"
|
||||||
rec := &kinesis.Record{SequenceNumber: &seq}
|
rec := &kinesis.Record{SequenceNumber: &seq}
|
||||||
|
|
||||||
|
|
@ -48,7 +48,7 @@ func Test_LastSeq(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_ShouldFlush(t *testing.T) {
|
func Test_ShouldFlush(t *testing.T) {
|
||||||
b := Buffer{MaxRecordCount: 2}
|
b := Buffer{MaxBatchCount: 2}
|
||||||
s1, s2 := "1", "2"
|
s1, s2 := "1", "2"
|
||||||
r1 := &kinesis.Record{SequenceNumber: &s1}
|
r1 := &kinesis.Record{SequenceNumber: &s1}
|
||||||
r2 := &kinesis.Record{SequenceNumber: &s2}
|
r2 := &kinesis.Record{SequenceNumber: &s2}
|
||||||
|
|
|
||||||
10
config.go
Normal file
10
config.go
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/apex/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
MaxBatchCount int
|
||||||
|
LogHandler log.Handler
|
||||||
|
}
|
||||||
40
consumer.go
40
consumer.go
|
|
@ -10,15 +10,19 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
const (
|
||||||
maxRecordCount = 1000
|
defaultMaxBatchCount = 1000
|
||||||
maxBufferTime = "30s"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewConsumer creates a new kinesis connection and returns a
|
// NewConsumer creates a new consumer with initialied kinesis connection
|
||||||
// new consumer initialized with app and stream name
|
func NewConsumer(appName, streamName string, cfg Config) *Consumer {
|
||||||
func NewConsumer(appName, streamName string) *Consumer {
|
if cfg.LogHandler == nil {
|
||||||
log.SetHandler(text.New(os.Stderr))
|
cfg.LogHandler = text.New(os.Stderr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MaxBatchCount == 0 {
|
||||||
|
cfg.MaxBatchCount = defaultMaxBatchCount
|
||||||
|
}
|
||||||
|
|
||||||
svc := kinesis.New(
|
svc := kinesis.New(
|
||||||
session.New(
|
session.New(
|
||||||
|
|
@ -30,6 +34,7 @@ func NewConsumer(appName, streamName string) *Consumer {
|
||||||
appName: appName,
|
appName: appName,
|
||||||
streamName: streamName,
|
streamName: streamName,
|
||||||
svc: svc,
|
svc: svc,
|
||||||
|
cfg: cfg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -37,27 +42,14 @@ type Consumer struct {
|
||||||
appName string
|
appName string
|
||||||
streamName string
|
streamName string
|
||||||
svc *kinesis.Kinesis
|
svc *kinesis.Kinesis
|
||||||
}
|
cfg Config
|
||||||
|
|
||||||
// Set `option` to `value`
|
|
||||||
func (c *Consumer) Set(option string, value interface{}) {
|
|
||||||
switch option {
|
|
||||||
case "maxRecordCount":
|
|
||||||
maxRecordCount = value.(int)
|
|
||||||
default:
|
|
||||||
log.Error("invalid option")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetLogHandler allows users override logger
|
|
||||||
func (c *Consumer) SetLogHandler(handler log.Handler) {
|
|
||||||
log.SetHandler(handler)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start takes a handler and then loops over each of the shards
|
// Start takes a handler and then loops over each of the shards
|
||||||
// processing each one with the handler.
|
// processing each one with the handler.
|
||||||
func (c *Consumer) Start(handler Handler) {
|
func (c *Consumer) Start(handler Handler) {
|
||||||
|
log.SetHandler(c.cfg.LogHandler)
|
||||||
|
|
||||||
resp, err := c.svc.DescribeStream(
|
resp, err := c.svc.DescribeStream(
|
||||||
&kinesis.DescribeStreamInput{
|
&kinesis.DescribeStreamInput{
|
||||||
StreamName: aws.String(c.streamName),
|
StreamName: aws.String(c.streamName),
|
||||||
|
|
@ -82,7 +74,7 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) {
|
||||||
})
|
})
|
||||||
|
|
||||||
buf := &Buffer{
|
buf := &Buffer{
|
||||||
MaxRecordCount: maxRecordCount,
|
MaxBatchCount: c.cfg.MaxBatchCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
checkpoint := &Checkpoint{
|
checkpoint := &Checkpoint{
|
||||||
|
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
package connector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/bmizerany/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_Set(t *testing.T) {
|
|
||||||
defaultMaxRecordCount := 1000
|
|
||||||
assert.Equal(t, maxRecordCount, defaultMaxRecordCount)
|
|
||||||
|
|
||||||
c := NewConsumer("app", "stream")
|
|
||||||
c.Set("maxRecordCount", 100)
|
|
||||||
|
|
||||||
assert.Equal(t, maxRecordCount, 100)
|
|
||||||
}
|
|
||||||
|
|
@ -31,8 +31,12 @@ func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
svc := firehose.New(session.New())
|
svc := firehose.New(session.New())
|
||||||
|
|
||||||
c := connector.NewConsumer(*app, *stream)
|
cfg := connector.Config{
|
||||||
c.Set("maxRecordCount", 400)
|
MaxBatchCount: 400,
|
||||||
|
}
|
||||||
|
|
||||||
|
c := connector.NewConsumer(*app, *stream, cfg)
|
||||||
|
|
||||||
c.Start(connector.HandlerFunc(func(b connector.Buffer) {
|
c.Start(connector.HandlerFunc(func(b connector.Buffer) {
|
||||||
params := &firehose.PutRecordBatchInput{
|
params := &firehose.PutRecordBatchInput{
|
||||||
DeliveryStreamName: aws.String(*delivery),
|
DeliveryStreamName: aws.String(*delivery),
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,11 @@ func main() {
|
||||||
Region: "us-west-1",
|
Region: "us-west-1",
|
||||||
}
|
}
|
||||||
|
|
||||||
c := connector.NewConsumer(*app, *stream)
|
cfg := connector.Config{
|
||||||
|
MaxBatchCount: 500,
|
||||||
|
}
|
||||||
|
|
||||||
|
c := connector.NewConsumer(*app, *stream, cfg)
|
||||||
|
|
||||||
c.Start(connector.HandlerFunc(func(b connector.Buffer) {
|
c.Start(connector.HandlerFunc(func(b connector.Buffer) {
|
||||||
body := new(bytes.Buffer)
|
body := new(bytes.Buffer)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue