kinesis-consumer/group.go

22 lines
590 B
Go
Raw Permalink Normal View History

package consumer
import (
"context"
2021-09-22 05:00:14 +00:00
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
)
// Group interface used to manage which shard to process
type Group interface {
Start(ctx context.Context, shardc chan types.Shard) error
GetCheckpoint(streamName, shardID string) (string, error)
SetCheckpoint(streamName, shardID, sequenceNumber string) error
}
Maintain parent/child shard ordering across shard splits/merges. (#155) Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard. However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained. `kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`: ``` time -> B / \ A D \ / C ``` record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code. This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to. This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 15:37:42 +00:00
type CloseableGroup interface {
Group
// Allows shard processors to tell the group when the shard has been
// fully processed. Should be called only once per shardID.
CloseShard(ctx context.Context, shardID string) error
}