diff --git a/broker.go b/broker.go index 2c9959b..ecf25a1 100644 --- a/broker.go +++ b/broker.go @@ -11,14 +11,18 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" ) -const pollFreq = 30 * time.Second - -func newBroker(client kinesisiface.KinesisAPI, streamName string, shardc chan *kinesis.Shard) *broker { +func newBroker( + client kinesisiface.KinesisAPI, + streamName string, + shardc chan *kinesis.Shard, + logger Logger, +) *broker { return &broker{ client: client, shards: make(map[string]*kinesis.Shard), streamName: streamName, shardc: shardc, + logger: logger, } } @@ -28,36 +32,50 @@ type broker struct { client kinesisiface.KinesisAPI streamName string shardc chan *kinesis.Shard + logger Logger shardMu sync.Mutex shards map[string]*kinesis.Shard } -// pollShards loops forever attempting to find new shards -// to process -func (b *broker) pollShards(ctx context.Context) { - b.leaseShards() +// start is a blocking operation which will loop and attempt to find new +// shards on a regular cadence. +func (b *broker) start(ctx context.Context) { + b.findNewShards() + ticker := time.NewTicker(30 * time.Second) + + // Note: while ticker is a rather naive approach to this problem, + // it actually simplies a few things. i.e. If we miss a new shard while + // AWS is resharding we'll pick it up max 30 seconds later. + + // It might be worth refactoring this flow to allow the consumer to + // to notify the broker when a shard is closed. However, shards don't + // necessarily close at the same time, so we could potentially get a + // thundering heard of notifications from the consumer. for { select { case <-ctx.Done(): + ticker.Stop() return - case <-time.After(pollFreq): - b.leaseShards() + case <-ticker.C: + b.findNewShards() } } } -// leaseShards attempts to find new shards that need to be -// processed; when a new shard is found it passes the shard -// ID back to the consumer on the shardc channel -func (b *broker) leaseShards() { +// findNewShards pulls the list of shards from the Kinesis API +// and uses a local cache to determine if we are already processing +// a particular shard. +func (b *broker) findNewShards() { b.shardMu.Lock() defer b.shardMu.Unlock() + b.logger.Log("[BROKER]", "fetching shards") + shards, err := b.listShards() if err != nil { - fmt.Println(err) + b.logger.Log("[BROKER]", err) return } @@ -65,7 +83,6 @@ func (b *broker) leaseShards() { if _, ok := b.shards[*shard.ShardId]; ok { continue } - b.shards[*shard.ShardId] = shard b.shardc <- shard } diff --git a/consumer.go b/consumer.go index 9247bbe..adbb000 100644 --- a/consumer.go +++ b/consumer.go @@ -81,13 +81,13 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error { var ( errc = make(chan error, 1) shardc = make(chan *kinesis.Shard, 1) - broker = newBroker(c.client, c.streamName, shardc) + broker = newBroker(c.client, c.streamName, shardc, c.logger) ) ctx, cancel := context.WithCancel(ctx) defer cancel() - go broker.pollShards(ctx) + go broker.start(ctx) go func() { <-ctx.Done() @@ -105,7 +105,6 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error { default: // error has already occured } - return } }(aws.StringValue(shard.ShardId)) }