From 85b200f768bbe9258156b4504611bc445fb63517 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Mon, 8 Apr 2019 20:41:41 -0700 Subject: [PATCH] clean up broker --- broker.go | 50 ++++++++++++++++++++++++-------------------------- consumer.go | 4 ++-- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/broker.go b/broker.go index 989242c..b1207f2 100644 --- a/broker.go +++ b/broker.go @@ -22,7 +22,7 @@ func newBroker(client kinesisiface.KinesisAPI, streamName string, shardc chan *k } } -// broker keeps local cache list of the shard we are already processing +// broker caches a local list of the shards we are already processing // and routinely polls the stream looking for new shards to process type broker struct { client kinesisiface.KinesisAPI @@ -33,22 +33,28 @@ type broker struct { shards map[string]*kinesis.Shard } +// pollShards loops forever attempting to find new shards +// to process func (b *broker) pollShards(ctx context.Context) { - b.fetchShards() + b.leaseShards() - go func() { - for { - select { - case <-ctx.Done(): - return - case <-time.After(pollFreq): - b.fetchShards() - } + for { + select { + case <-ctx.Done(): + return + case <-time.After(pollFreq): + b.leaseShards() } - }() + } } -func (b *broker) fetchShards() { +// leaseShards attempts to find new shards that need to be +// processed; when a new shard is found it passing the shard +// ID back to the consumer on the shard channel +func (b *broker) leaseShards() { + b.shardMu.Lock() + defer b.shardMu.Unlock() + shards, err := b.listShards() if err != nil { fmt.Println(err) @@ -56,12 +62,16 @@ func (b *broker) fetchShards() { } for _, shard := range shards { - if b.takeLease(shard) { - b.shardc <- shard + if _, ok := b.shards[*shard.ShardId]; ok { + continue } + + b.shards[*shard.ShardId] = shard + b.shardc <- shard } } +// listShards pulls a list of shard IDs from the kinesis api func (b *broker) listShards() ([]*kinesis.Shard, error) { var ss []*kinesis.Shard var listShardsInput = &kinesis.ListShardsInput{ @@ -85,15 +95,3 @@ func (b *broker) listShards() ([]*kinesis.Shard, error) { } } } - -func (b *broker) takeLease(shard *kinesis.Shard) bool { - b.shardMu.Lock() - defer b.shardMu.Unlock() - - if _, ok := b.shards[*shard.ShardId]; ok { - return false - } - - b.shards[*shard.ShardId] = shard - return true -} diff --git a/consumer.go b/consumer.go index bcd8b51..9247bbe 100644 --- a/consumer.go +++ b/consumer.go @@ -87,9 +87,9 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - go func() { - broker.pollShards(ctx) + go broker.pollShards(ctx) + go func() { <-ctx.Done() close(shardc) }()