diff --git a/README.md b/README.md index d2e261e..1aefba3 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ func main() { // override library defaults cfg := connector.Config{ - MaxBatchCount: 400, + MaxRecordCount: 400, } // create new consumer diff --git a/buffer.go b/buffer.go index 0b95a44..202a022 100644 --- a/buffer.go +++ b/buffer.go @@ -9,7 +9,7 @@ type Buffer struct { firstSequenceNumber string lastSequenceNumber string - MaxBatchCount int + MaxRecordCount int } // AddRecord adds a record to the buffer. @@ -24,7 +24,7 @@ func (b *Buffer) AddRecord(r *kinesis.Record) { // ShouldFlush determines if the buffer has reached its target size. func (b *Buffer) ShouldFlush() bool { - return b.RecordCount() >= b.MaxBatchCount + return b.RecordCount() >= b.MaxRecordCount } // Flush empties the buffer and resets the sequence counter. diff --git a/buffer_test.go b/buffer_test.go index 10c3c6e..6870d21 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -8,7 +8,7 @@ import ( ) func BenchmarkBufferLifecycle(b *testing.B) { - buf := Buffer{MaxBatchCount: 1000} + buf := Buffer{MaxRecordCount: 1000} seq := "1" rec := &kinesis.Record{SequenceNumber: &seq} @@ -48,7 +48,7 @@ func Test_LastSeq(t *testing.T) { } func Test_ShouldFlush(t *testing.T) { - b := Buffer{MaxBatchCount: 2} + b := Buffer{MaxRecordCount: 2} s1, s2 := "1", "2" r1 := &kinesis.Record{SequenceNumber: &s1} r2 := &kinesis.Record{SequenceNumber: &s2} diff --git a/config.go b/config.go index 12dac97..3d95cc5 100644 --- a/config.go +++ b/config.go @@ -1,10 +1,63 @@ package connector import ( + "os" + "time" + "github.com/apex/log" ) +const ( + defaultBufferSize = 500 +) + type Config struct { - MaxBatchCount int - LogHandler log.Handler + // AppName is the application name. + AppName string + + // StreamName is the Kinesis stream. + StreamName string + + // FlushInterval is a regular interval for flushing the buffer. Defaults to 1s. + FlushInterval time.Duration + + // BufferSize determines the batch request size. Must not exceed 500. Defaults to 500. + BufferSize int + + // Logger is the logger used. Defaults to log.Log. + Logger log.Interface +} + +// defaults for configuration. +func (c *Config) setDefaults() { + if c.Logger == nil { + c.Logger = log.Log + } + + c.Logger = c.Logger.WithFields(log.Fields{ + "package": "kinesis-connectors", + }) + + if c.AppName == "" { + c.Logger.WithField("type", "config").Error("AppName required") + os.Exit(1) + } + + if c.StreamName == "" { + c.Logger.WithField("type", "config").Error("AppName required") + os.Exit(1) + } + + c.Logger = c.Logger.WithFields(log.Fields{ + "app": c.AppName, + "stream": c.StreamName, + }) + + if c.BufferSize == 0 { + c.BufferSize = defaultBufferSize + } + + if c.FlushInterval == 0 { + c.FlushInterval = time.Second + } } diff --git a/consumer.go b/consumer.go index 338035c..ce70360 100644 --- a/consumer.go +++ b/consumer.go @@ -1,28 +1,17 @@ package connector import ( - "log" + "os" - apexlog "github.com/apex/log" - "github.com/apex/log/handlers/discard" + "github.com/apex/log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" ) -const ( - defaultMaxBatchCount = 1000 -) - // NewConsumer creates a new consumer with initialied kinesis connection -func NewConsumer(appName, streamName string, cfg Config) *Consumer { - if cfg.LogHandler == nil { - cfg.LogHandler = discard.New() - } - - if cfg.MaxBatchCount == 0 { - cfg.MaxBatchCount = defaultMaxBatchCount - } +func NewConsumer(config Config) *Consumer { + config.setDefaults() svc := kinesis.New( session.New( @@ -31,33 +20,28 @@ func NewConsumer(appName, streamName string, cfg Config) *Consumer { ) return &Consumer{ - appName: appName, - streamName: streamName, - svc: svc, - cfg: cfg, + svc: svc, + Config: config, } } type Consumer struct { - appName string - streamName string - svc *kinesis.Kinesis - cfg Config + svc *kinesis.Kinesis + Config } // Start takes a handler and then loops over each of the shards // processing each one with the handler. func (c *Consumer) Start(handler Handler) { - apexlog.SetHandler(c.cfg.LogHandler) - resp, err := c.svc.DescribeStream( &kinesis.DescribeStreamInput{ - StreamName: aws.String(c.streamName), + StreamName: aws.String(c.StreamName), }, ) if err != nil { - log.Fatalf("Error DescribeStream %v", err) + c.Logger.WithError(err).Error("DescribeStream") + os.Exit(1) } for _, shard := range resp.StreamDescription.Shards { @@ -66,24 +50,18 @@ func (c *Consumer) Start(handler Handler) { } func (c *Consumer) handlerLoop(shardID string, handler Handler) { - ctx := apexlog.WithFields(apexlog.Fields{ - "app": c.appName, - "stream": c.streamName, - "shard": shardID, - }) - buf := &Buffer{ - MaxBatchCount: c.cfg.MaxBatchCount, + MaxRecordCount: c.BufferSize, } checkpoint := &Checkpoint{ - AppName: c.appName, - StreamName: c.streamName, + AppName: c.AppName, + StreamName: c.StreamName, } params := &kinesis.GetShardIteratorInput{ ShardId: aws.String(shardID), - StreamName: aws.String(c.streamName), + StreamName: aws.String(c.StreamName), } if checkpoint.CheckpointExists(shardID) { @@ -95,10 +73,16 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { resp, err := c.svc.GetShardIterator(params) if err != nil { - log.Fatalf("Error GetShardIterator %v", err) + c.Logger.WithError(err).Error("GetShardIterator") + os.Exit(1) } shardIterator := resp.ShardIterator + + ctx := c.Logger.WithFields(log.Fields{ + "shard": shardID, + }) + ctx.Info("processing") for { @@ -118,13 +102,14 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { if buf.ShouldFlush() { handler.HandleRecords(*buf) - ctx.WithField("count", buf.RecordCount()).Info("emitted") + ctx.WithField("count", buf.RecordCount()).Info("flushed") checkpoint.SetCheckpoint(shardID, buf.LastSeq()) buf.Flush() } } } else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator { - log.Fatalf("Error NextShardIterator") + c.Logger.Error("NextShardIterator") + os.Exit(1) } shardIterator = resp.NextShardIterator diff --git a/examples/firehose/main.go b/examples/firehose/main.go index fc40f60..27d4566 100644 --- a/examples/firehose/main.go +++ b/examples/firehose/main.go @@ -32,7 +32,7 @@ func main() { svc := firehose.New(session.New()) cfg := connector.Config{ - MaxBatchCount: 400, + MaxRecordCount: 400, } c := connector.NewConsumer(*app, *stream, cfg) diff --git a/examples/producer/main.go b/examples/producer/main.go index fd6989b..cdc6687 100644 --- a/examples/producer/main.go +++ b/examples/producer/main.go @@ -9,7 +9,7 @@ import ( "github.com/apex/log/handlers/text" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" - prdcr "github.com/tj/go-kinesis" + producer "github.com/tj/go-kinesis" ) // Note: download file with test data @@ -19,15 +19,16 @@ var stream = flag.String("s", "", "Stream name") func main() { flag.Parse() log.SetHandler(text.New(os.Stderr)) + log.SetLevel(log.DebugLevel) // set up producer svc := kinesis.New(session.New()) - producer := prdcr.New(prdcr.Config{ + p := producer.New(producer.Config{ StreamName: *stream, BacklogSize: 500, Client: svc, }) - producer.Start() + p.Start() // open data file f, err := os.Open("/tmp/users.txt") @@ -39,12 +40,12 @@ func main() { // loop over file data b := bufio.NewScanner(f) for b.Scan() { - err := producer.Put(b.Bytes(), "site") + err := p.Put(b.Bytes(), "site") if err != nil { log.WithError(err).Fatal("error producing") } } - producer.Stop() + p.Stop() } diff --git a/examples/s3/main.go b/examples/s3/main.go index e8ee82d..b5558e3 100644 --- a/examples/s3/main.go +++ b/examples/s3/main.go @@ -6,31 +6,32 @@ import ( "fmt" "os" + "github.com/apex/log" "github.com/apex/log/handlers/text" "github.com/harlow/kinesis-connectors" "github.com/harlow/kinesis-connectors/emitter/s3" ) -var ( - app = flag.String("a", "", "App name") - bucket = flag.String("b", "", "Bucket name") - stream = flag.String("s", "", "Stream name") -) - func main() { + log.SetHandler(text.New(os.Stderr)) + log.SetLevel(log.DebugLevel) + + var ( + app = flag.String("a", "", "App name") + bucket = flag.String("b", "", "Bucket name") + stream = flag.String("s", "", "Stream name") + ) flag.Parse() - emitter := &s3.Emitter{ + e := &s3.Emitter{ Bucket: *bucket, Region: "us-west-1", } - cfg := connector.Config{ - MaxBatchCount: 500, - LogHandler: text.New(os.Stderr), - } - - c := connector.NewConsumer(*app, *stream, cfg) + c := connector.NewConsumer(connector.Config{ + AppName: *app, + StreamName: *stream, + }) c.Start(connector.HandlerFunc(func(b connector.Buffer) { body := new(bytes.Buffer) @@ -39,7 +40,7 @@ func main() { body.Write(r.Data) } - err := emitter.Emit( + err := e.Emit( s3.Key("", b.FirstSeq(), b.LastSeq()), bytes.NewReader(body.Bytes()), )