add logger to the broker
This commit is contained in:
parent
3e9760ef2f
commit
121cbc26f0
2 changed files with 34 additions and 18 deletions
47
broker.go
47
broker.go
|
|
@ -11,14 +11,18 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
const pollFreq = 30 * time.Second
|
func newBroker(
|
||||||
|
client kinesisiface.KinesisAPI,
|
||||||
func newBroker(client kinesisiface.KinesisAPI, streamName string, shardc chan *kinesis.Shard) *broker {
|
streamName string,
|
||||||
|
shardc chan *kinesis.Shard,
|
||||||
|
logger Logger,
|
||||||
|
) *broker {
|
||||||
return &broker{
|
return &broker{
|
||||||
client: client,
|
client: client,
|
||||||
shards: make(map[string]*kinesis.Shard),
|
shards: make(map[string]*kinesis.Shard),
|
||||||
streamName: streamName,
|
streamName: streamName,
|
||||||
shardc: shardc,
|
shardc: shardc,
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -28,36 +32,50 @@ type broker struct {
|
||||||
client kinesisiface.KinesisAPI
|
client kinesisiface.KinesisAPI
|
||||||
streamName string
|
streamName string
|
||||||
shardc chan *kinesis.Shard
|
shardc chan *kinesis.Shard
|
||||||
|
logger Logger
|
||||||
|
|
||||||
shardMu sync.Mutex
|
shardMu sync.Mutex
|
||||||
shards map[string]*kinesis.Shard
|
shards map[string]*kinesis.Shard
|
||||||
}
|
}
|
||||||
|
|
||||||
// pollShards loops forever attempting to find new shards
|
// start is a blocking operation which will loop and attempt to find new
|
||||||
// to process
|
// shards on a regular cadence.
|
||||||
func (b *broker) pollShards(ctx context.Context) {
|
func (b *broker) start(ctx context.Context) {
|
||||||
b.leaseShards()
|
b.findNewShards()
|
||||||
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
|
|
||||||
|
// Note: while ticker is a rather naive approach to this problem,
|
||||||
|
// it actually simplies a few things. i.e. If we miss a new shard while
|
||||||
|
// AWS is resharding we'll pick it up max 30 seconds later.
|
||||||
|
|
||||||
|
// It might be worth refactoring this flow to allow the consumer to
|
||||||
|
// to notify the broker when a shard is closed. However, shards don't
|
||||||
|
// necessarily close at the same time, so we could potentially get a
|
||||||
|
// thundering heard of notifications from the consumer.
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
ticker.Stop()
|
||||||
return
|
return
|
||||||
case <-time.After(pollFreq):
|
case <-ticker.C:
|
||||||
b.leaseShards()
|
b.findNewShards()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// leaseShards attempts to find new shards that need to be
|
// findNewShards pulls the list of shards from the Kinesis API
|
||||||
// processed; when a new shard is found it passes the shard
|
// and uses a local cache to determine if we are already processing
|
||||||
// ID back to the consumer on the shardc channel
|
// a particular shard.
|
||||||
func (b *broker) leaseShards() {
|
func (b *broker) findNewShards() {
|
||||||
b.shardMu.Lock()
|
b.shardMu.Lock()
|
||||||
defer b.shardMu.Unlock()
|
defer b.shardMu.Unlock()
|
||||||
|
|
||||||
|
b.logger.Log("[BROKER]", "fetching shards")
|
||||||
|
|
||||||
shards, err := b.listShards()
|
shards, err := b.listShards()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
b.logger.Log("[BROKER]", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -65,7 +83,6 @@ func (b *broker) leaseShards() {
|
||||||
if _, ok := b.shards[*shard.ShardId]; ok {
|
if _, ok := b.shards[*shard.ShardId]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
b.shards[*shard.ShardId] = shard
|
b.shards[*shard.ShardId] = shard
|
||||||
b.shardc <- shard
|
b.shardc <- shard
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -81,13 +81,13 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
|
||||||
var (
|
var (
|
||||||
errc = make(chan error, 1)
|
errc = make(chan error, 1)
|
||||||
shardc = make(chan *kinesis.Shard, 1)
|
shardc = make(chan *kinesis.Shard, 1)
|
||||||
broker = newBroker(c.client, c.streamName, shardc)
|
broker = newBroker(c.client, c.streamName, shardc, c.logger)
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go broker.pollShards(ctx)
|
go broker.start(ctx)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
|
@ -105,7 +105,6 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
|
||||||
default:
|
default:
|
||||||
// error has already occured
|
// error has already occured
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}(aws.StringValue(shard.ShardId))
|
}(aws.StringValue(shard.ShardId))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue