clean up broker

This commit is contained in:
Harlow Ward 2019-04-08 20:41:41 -07:00
parent 2c5b50ddf8
commit 85b200f768
2 changed files with 26 additions and 28 deletions

View file

@ -22,7 +22,7 @@ func newBroker(client kinesisiface.KinesisAPI, streamName string, shardc chan *k
} }
} }
// broker keeps local cache list of the shard we are already processing // broker caches a local list of the shards we are already processing
// and routinely polls the stream looking for new shards to process // and routinely polls the stream looking for new shards to process
type broker struct { type broker struct {
client kinesisiface.KinesisAPI client kinesisiface.KinesisAPI
@ -33,22 +33,28 @@ type broker struct {
shards map[string]*kinesis.Shard shards map[string]*kinesis.Shard
} }
// pollShards loops forever attempting to find new shards
// to process
func (b *broker) pollShards(ctx context.Context) { func (b *broker) pollShards(ctx context.Context) {
b.fetchShards() b.leaseShards()
go func() { for {
for { select {
select { case <-ctx.Done():
case <-ctx.Done(): return
return case <-time.After(pollFreq):
case <-time.After(pollFreq): b.leaseShards()
b.fetchShards()
}
} }
}() }
} }
func (b *broker) fetchShards() { // leaseShards attempts to find new shards that need to be
// processed; when a new shard is found it passing the shard
// ID back to the consumer on the shard channel
func (b *broker) leaseShards() {
b.shardMu.Lock()
defer b.shardMu.Unlock()
shards, err := b.listShards() shards, err := b.listShards()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -56,12 +62,16 @@ func (b *broker) fetchShards() {
} }
for _, shard := range shards { for _, shard := range shards {
if b.takeLease(shard) { if _, ok := b.shards[*shard.ShardId]; ok {
b.shardc <- shard continue
} }
b.shards[*shard.ShardId] = shard
b.shardc <- shard
} }
} }
// listShards pulls a list of shard IDs from the kinesis api
func (b *broker) listShards() ([]*kinesis.Shard, error) { func (b *broker) listShards() ([]*kinesis.Shard, error) {
var ss []*kinesis.Shard var ss []*kinesis.Shard
var listShardsInput = &kinesis.ListShardsInput{ var listShardsInput = &kinesis.ListShardsInput{
@ -85,15 +95,3 @@ func (b *broker) listShards() ([]*kinesis.Shard, error) {
} }
} }
} }
func (b *broker) takeLease(shard *kinesis.Shard) bool {
b.shardMu.Lock()
defer b.shardMu.Unlock()
if _, ok := b.shards[*shard.ShardId]; ok {
return false
}
b.shards[*shard.ShardId] = shard
return true
}

View file

@ -87,9 +87,9 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
go func() { go broker.pollShards(ctx)
broker.pollShards(ctx)
go func() {
<-ctx.Done() <-ctx.Done()
close(shardc) close(shardc)
}() }()