From e843ae5928125a64a32bd16d331063cc947e3fef Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sat, 30 Apr 2016 18:05:04 -0700 Subject: [PATCH] Remove poll interval in favor of aws retry backoff --- README.md | 1 - consumer.go | 42 +++++++++++++++++++----------------------- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 97a49e8..2787075 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,6 @@ func main() { // override default values c.Set("maxBatchCount", 200) - c.Set("pollInterval", "3s") // start consuming records from the queues c.Start(connector.HandlerFunc(func(b connector.Buffer) { diff --git a/consumer.go b/consumer.go index 5847c55..5491daa 100644 --- a/consumer.go +++ b/consumer.go @@ -2,7 +2,6 @@ package connector import ( "os" - "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -11,7 +10,6 @@ import ( ) var ( - pollInterval = 1 * time.Second maxBatchCount = 1000 ) @@ -38,17 +36,9 @@ type Consumer struct { // Set `option` to `value` func (c *Consumer) Set(option string, value interface{}) { - var err error - switch option { case "maxBatchCount": maxBatchCount = value.(int) - case "pollInterval": - pollInterval, err = time.ParseDuration(value.(string)) - if err != nil { - logger.Log("fatal", "ParseDuration", "msg", "unable to parse pollInterval value") - os.Exit(1) - } default: logger.Log("fatal", "Set", "msg", "unknown option") os.Exit(1) @@ -76,12 +66,20 @@ func (c *Consumer) Start(handler Handler) { } func (c *Consumer) handlerLoop(shardID string, handler Handler) { + buf := &Buffer{ + MaxBatchCount: maxBatchCount, + } + + checkpoint := &Checkpoint{ + AppName: c.appName, + StreamName: c.streamName, + } + params := &kinesis.GetShardIteratorInput{ ShardId: aws.String(shardID), StreamName: aws.String(c.streamName), } - checkpoint := &Checkpoint{AppName: c.appName, StreamName: c.streamName} if checkpoint.CheckpointExists(shardID) { params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") params.StartingSequenceNumber = aws.String(checkpoint.SequenceNumber()) @@ -97,13 +95,14 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { } } - b := &Buffer{MaxBatchCount: maxBatchCount} shardIterator := resp.ShardIterator for { - resp, err := c.svc.GetRecords(&kinesis.GetRecordsInput{ - ShardIterator: shardIterator, - }) + resp, err := c.svc.GetRecords( + &kinesis.GetRecordsInput{ + ShardIterator: shardIterator, + }, + ) if err != nil { awsErr, _ := err.(awserr.Error) @@ -113,20 +112,17 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { if len(resp.Records) > 0 { for _, r := range resp.Records { - b.AddRecord(r) + buf.AddRecord(r) - if b.ShouldFlush() { - handler.HandleRecords(*b) - checkpoint.SetCheckpoint(shardID, b.LastSeq()) - b.Flush() + if buf.ShouldFlush() { + handler.HandleRecords(*buf) + checkpoint.SetCheckpoint(shardID, buf.LastSeq()) + buf.Flush() } } } else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator { logger.Log("fatal", "nextShardIterator", "msg", err.Error()) os.Exit(1) - } else { - logger.Log("info", "sleeping", "msg", "no records to process") - time.Sleep(pollInterval) } shardIterator = resp.NextShardIterator