kinesis-consumer/allgroup.go

145 lines
4.3 KiB
Go
Raw Permalink Normal View History

package consumer
import (
"context"
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
"fmt"
"sync"
"time"
2021-09-22 05:00:14 +00:00
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
)
// NewAllGroup returns an initialized AllGroup for consuming
2019-07-28 17:54:01 +00:00
// all shards on a stream
2021-09-22 05:00:14 +00:00
func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logger) *AllGroup {
return &AllGroup{
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
ksis: ksis,
shards: make(map[string]types.Shard),
shardsClosed: make(map[string]chan struct{}),
streamName: streamName,
logger: logger,
Store: store,
}
}
2019-09-02 14:26:44 +00:00
// AllGroup is used to consume all shards from a single consumer. It
// caches a local list of the shards we are already processing
// and routinely polls the stream looking for new shards to process.
type AllGroup struct {
2021-09-22 05:00:14 +00:00
ksis kinesisClient
streamName string
logger Logger
Store
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
shardMu sync.Mutex
shards map[string]types.Shard
shardsClosed map[string]chan struct{}
}
2019-07-28 17:54:01 +00:00
// Start is a blocking operation which will loop and attempt to find new
// shards on a regular cadence.
func (g *AllGroup) Start(ctx context.Context, shardC chan types.Shard) error {
// Note: while ticker is a rather naive approach to this problem,
// it actually simplifies a few things. I.e. If we miss a new shard
// while AWS is resharding, we'll pick it up max 30 seconds later.
// It might be worth refactoring this flow to allow the consumer
// to notify the broker when a shard is closed. However, shards don't
// necessarily close at the same time, so we could potentially get a
// thundering heard of notifications from the consumer.
2021-09-22 05:00:14 +00:00
var ticker = time.NewTicker(30 * time.Second)
for {
if err := g.findNewShards(ctx, shardC); err != nil {
ticker.Stop()
return err
}
2021-09-22 05:00:14 +00:00
select {
case <-ctx.Done():
ticker.Stop()
return nil
case <-ticker.C:
}
}
}
func (g *AllGroup) CloseShard(_ context.Context, shardID string) error {
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
g.shardMu.Lock()
defer g.shardMu.Unlock()
c, ok := g.shardsClosed[shardID]
if !ok {
return fmt.Errorf("closing unknown shard ID %q", shardID)
}
close(c)
return nil
}
func waitForCloseChannel(ctx context.Context, c <-chan struct{}) bool {
if c == nil {
// no channel means we haven't seen this shard in listShards, so it
// probably fell off the TRIM_HORIZON, and we can assume it's fully processed.
return true
}
select {
case <-ctx.Done():
return false
case <-c:
// the channel has been processed and closed by the consumer (CloseShard has been called)
return true
}
}
// findNewShards pulls the list of shards from the Kinesis API
// and uses a local cache to determine if we are already processing
// a particular shard.
func (g *AllGroup) findNewShards(ctx context.Context, shardC chan types.Shard) error {
g.shardMu.Lock()
defer g.shardMu.Unlock()
g.logger.Log("[GROUP]", "fetching shards")
2021-09-22 05:00:14 +00:00
shards, err := listShards(ctx, g.ksis, g.streamName)
if err != nil {
g.logger.Log("[GROUP] error:", err)
return err
}
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
// We do two `for` loops, since we have to set up all the `shardClosed`
// channels before we start using any of them. It's highly probable
// that Kinesis provides us the shards in dependency order (parents
// before children), but it doesn't appear to be a guarantee.
newShards := make(map[string]types.Shard)
for _, shard := range shards {
if _, ok := g.shards[*shard.ShardId]; ok {
continue
}
g.shards[*shard.ShardId] = shard
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
g.shardsClosed[*shard.ShardId] = make(chan struct{})
newShards[*shard.ShardId] = shard
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
}
// only new shards need to be checked for parent dependencies
for _, shard := range newShards {
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
shard := shard // Shadow shard, since we use it in goroutine
var parent1, parent2 <-chan struct{}
if shard.ParentShardId != nil {
parent1 = g.shardsClosed[*shard.ParentShardId]
}
if shard.AdjacentParentShardId != nil {
parent2 = g.shardsClosed[*shard.AdjacentParentShardId]
}
go func() {
// Asynchronously wait for all parents of this shard to be processed
// before providing it out to our client. Kinesis guarantees that a
// given partition key's data will be provided to clients in-order,
// but when splits or joins happen, we need to process all parents prior
// to processing children or that ordering guarantee is not maintained.
if waitForCloseChannel(ctx, parent1) && waitForCloseChannel(ctx, parent2) {
shardC <- shard
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
}
}()
}
return nil
}