* fix nil pointer dereference on AWS errors
* return Start errors to Scan consumer
before the previous commit e465b09, client errors panicked the
reader, so consumers would pick up sharditerator errors by virtue of
their server crashing and burning.
Now that client errors are properly returned, the behaviour of
listShards is problematic because it absorbs any client errors it gets.
The result of these two things now is that if you hit an aws error, your server will go into an
endless scan loop you can't detect and can't easily recover from.
To avoid that, listShards will now stop if it hits a client error.
---------
Co-authored-by: Jarrad Whitaker <jwhitaker 📧 swift-nav.com>
Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input `pkey=x,val=1 pkey=x,val=2 pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard.
However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs. A split shard A will create children B and C, both with `ParentShardId=A`. A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained.
`kinesis-consumer` currently doesn't do this. Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`:
```
time ->
B
/ \
A D
\ /
C
```
record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed. By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code.
This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard. `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to.
This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere. Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
Update the DynamoDB checkpoint item to use the `dynamodbav` value for marshaling.
Fixes: https://github.com/harlow/kinesis-consumer/issues/142
Minor changes:
* Update the DDB example consumer to support a local version of DDB for streamlined testing
* Include ShardID in Record passed to ScanFunc
* Update mock to explicitly use kinesis.Record
Supports change wherein consumer.Record is changed from an alias of
kinesis.Record to a composition containing it.
As we work towards introducing consumer groups to the repository we need a more generic name for the persistence layer for storing checkpoints and leases for given shards.
* Rename `checkpoint` to `store`
* Introduce Group interface and AllGroup
As we move towards consumer groups we'll need to support the current
"consume all shards" strategy, and setup the codebase for the
anticipated "consume balanced shards."