From 2c5b50ddf8667e5f1c96cac5084a42fbf713fa90 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Mon, 8 Apr 2019 20:33:39 -0700 Subject: [PATCH] add comment to broker --- broker.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/broker.go b/broker.go index 9ddc140..989242c 100644 --- a/broker.go +++ b/broker.go @@ -11,11 +11,9 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" ) -func newBroker( - client kinesisiface.KinesisAPI, - streamName string, - shardc chan *kinesis.Shard, -) *broker { +const pollFreq = 30 * time.Second + +func newBroker(client kinesisiface.KinesisAPI, streamName string, shardc chan *kinesis.Shard) *broker { return &broker{ client: client, shards: make(map[string]*kinesis.Shard), @@ -24,6 +22,8 @@ func newBroker( } } +// broker keeps local cache list of the shard we are already processing +// and routinely polls the stream looking for new shards to process type broker struct { client kinesisiface.KinesisAPI streamName string @@ -36,14 +36,12 @@ type broker struct { func (b *broker) pollShards(ctx context.Context) { b.fetchShards() - // TODO: also add signal to re-poll - go func() { for { select { case <-ctx.Done(): return - case <-time.After(30 * time.Second): + case <-time.After(pollFreq): b.fetchShards() } }