144 lines
4.3 KiB
Go
144 lines
4.3 KiB
Go
package consumer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
|
)
|
|
|
|
// NewAllGroup returns an initialized AllGroup for consuming
|
|
// all shards on a stream
|
|
func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logger) *AllGroup {
|
|
return &AllGroup{
|
|
ksis: ksis,
|
|
shards: make(map[string]types.Shard),
|
|
shardsClosed: make(map[string]chan struct{}),
|
|
streamName: streamName,
|
|
logger: logger,
|
|
Store: store,
|
|
}
|
|
}
|
|
|
|
// AllGroup is used to consume all shards from a single consumer. It
|
|
// caches a local list of the shards we are already processing
|
|
// and routinely polls the stream looking for new shards to process.
|
|
type AllGroup struct {
|
|
ksis kinesisClient
|
|
streamName string
|
|
logger Logger
|
|
Store
|
|
|
|
shardMu sync.Mutex
|
|
shards map[string]types.Shard
|
|
shardsClosed map[string]chan struct{}
|
|
}
|
|
|
|
// Start is a blocking operation which will loop and attempt to find new
|
|
// shards on a regular cadence.
|
|
func (g *AllGroup) Start(ctx context.Context, shardC chan types.Shard) error {
|
|
// Note: while ticker is a rather naive approach to this problem,
|
|
// it actually simplifies a few things. I.e. If we miss a new shard
|
|
// while AWS is resharding, we'll pick it up max 30 seconds later.
|
|
|
|
// It might be worth refactoring this flow to allow the consumer
|
|
// to notify the broker when a shard is closed. However, shards don't
|
|
// necessarily close at the same time, so we could potentially get a
|
|
// thundering heard of notifications from the consumer.
|
|
|
|
var ticker = time.NewTicker(30 * time.Second)
|
|
|
|
for {
|
|
if err := g.findNewShards(ctx, shardC); err != nil {
|
|
ticker.Stop()
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
ticker.Stop()
|
|
return nil
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (g *AllGroup) CloseShard(_ context.Context, shardID string) error {
|
|
g.shardMu.Lock()
|
|
defer g.shardMu.Unlock()
|
|
c, ok := g.shardsClosed[shardID]
|
|
if !ok {
|
|
return fmt.Errorf("closing unknown shard ID %q", shardID)
|
|
}
|
|
close(c)
|
|
return nil
|
|
}
|
|
|
|
func waitForCloseChannel(ctx context.Context, c <-chan struct{}) bool {
|
|
if c == nil {
|
|
// no channel means we haven't seen this shard in listShards, so it
|
|
// probably fell off the TRIM_HORIZON, and we can assume it's fully processed.
|
|
return true
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case <-c:
|
|
// the channel has been processed and closed by the consumer (CloseShard has been called)
|
|
return true
|
|
}
|
|
}
|
|
|
|
// findNewShards pulls the list of shards from the Kinesis API
|
|
// and uses a local cache to determine if we are already processing
|
|
// a particular shard.
|
|
func (g *AllGroup) findNewShards(ctx context.Context, shardC chan types.Shard) error {
|
|
g.shardMu.Lock()
|
|
defer g.shardMu.Unlock()
|
|
|
|
g.logger.Log("[GROUP]", "fetching shards")
|
|
|
|
shards, err := listShards(ctx, g.ksis, g.streamName)
|
|
if err != nil {
|
|
g.logger.Log("[GROUP] error:", err)
|
|
return err
|
|
}
|
|
|
|
// We do two `for` loops, since we have to set up all the `shardClosed`
|
|
// channels before we start using any of them. It's highly probable
|
|
// that Kinesis provides us the shards in dependency order (parents
|
|
// before children), but it doesn't appear to be a guarantee.
|
|
newShards := make(map[string]types.Shard)
|
|
for _, shard := range shards {
|
|
if _, ok := g.shards[*shard.ShardId]; ok {
|
|
continue
|
|
}
|
|
g.shards[*shard.ShardId] = shard
|
|
g.shardsClosed[*shard.ShardId] = make(chan struct{})
|
|
newShards[*shard.ShardId] = shard
|
|
}
|
|
// only new shards need to be checked for parent dependencies
|
|
for _, shard := range newShards {
|
|
shard := shard // Shadow shard, since we use it in goroutine
|
|
var parent1, parent2 <-chan struct{}
|
|
if shard.ParentShardId != nil {
|
|
parent1 = g.shardsClosed[*shard.ParentShardId]
|
|
}
|
|
if shard.AdjacentParentShardId != nil {
|
|
parent2 = g.shardsClosed[*shard.AdjacentParentShardId]
|
|
}
|
|
go func() {
|
|
// Asynchronously wait for all parents of this shard to be processed
|
|
// before providing it out to our client. Kinesis guarantees that a
|
|
// given partition key's data will be provided to clients in-order,
|
|
// but when splits or joins happen, we need to process all parents prior
|
|
// to processing children or that ordering guarantee is not maintained.
|
|
if waitForCloseChannel(ctx, parent1) && waitForCloseChannel(ctx, parent2) {
|
|
shardC <- shard
|
|
}
|
|
}()
|
|
}
|
|
return nil
|
|
}
|