Commit graph

200 commits

Author SHA1 Message Date
Guy A Molinari
baf8258298
Fix issue #167 - Concurrent write/write of in-flight shard map (#168) 2024-12-07 11:54:45 -08:00
Sanket Deshpande
9b6b643efb
fixed concurrent map rw panic for shardsInProgress map (#163)
Co-authored-by: Sanket Deshpande <sanket@clearblade.com>
2024-09-25 12:23:08 -07:00
lrs
43900507c9
fix isRetriableError (#159)
fix issues-158
2024-09-16 12:27:23 -07:00
Mikhail Konovalov
8d10ac8dac
Fix ProvisionedThroughputExceededException error (#161)
Fixes #158. Seems the bug was introduced in #155. See #155 (comment)
2024-09-16 12:25:49 -07:00
Jarrad
553e2392fd
fix nil pointer dereference on AWS errors (#148)
* fix nil pointer dereference on AWS errors

* return Start errors to Scan consumer

before the previous commit e465b09, client errors panicked the
reader, so consumers would pick up sharditerator errors by virtue of
their server crashing and burning.

Now that client errors are properly returned, the behaviour of
listShards is problematic because it absorbs any client errors it gets.

The result of these two things now is that if you hit an aws error, your server will go into an
endless scan loop you can't detect and can't easily recover from.

To avoid that, listShards will now stop if it hits a client error.

---------

Co-authored-by: Jarrad Whitaker <jwhitaker 📧 swift-nav.com>
2024-06-06 08:38:16 -07:00
gram-signal
6720a01733
Maintain parent/child shard ordering across shard splits/merges. (#155)
Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained.  IE: given an input `pkey=x,val=1  pkey=x,val=2  pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful.  It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard.

However, shards can be split and merge, to distribute load better and handle per-shard throughput limits.  Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one.  When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs.  A split shard A will create children B and C, both with `ParentShardId=A`.  A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`.  So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained.

`kinesis-consumer` currently doesn't do this.  Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing.  Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`:

```
time ->
  B
 / \
A   D
 \ /
  C
```

record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed.  By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code.

This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard.  `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to.

This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere.  Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
2024-06-06 08:37:42 -07:00
guangwu
188bdff278
fix: typo (#156) 2023-12-14 16:36:20 -08:00
Dario Emerson
af2db0d43f
update go-redis to v9 (#150) 2023-06-16 11:20:07 -07:00
Harlow Ward
c2b9f79d7a Put aws-skd-v2 note in installation section 2021-12-04 13:43:18 -08:00
Harlow Ward
6cbda0f706
Update ddb checkpoint item to use dynamodbav (#144)
Update the DynamoDB checkpoint item to use the `dynamodbav` value for marshaling. 

Fixes: https://github.com/harlow/kinesis-consumer/issues/142

Minor changes:
* Update the DDB example consumer to support a local version of DDB for streamlined testing
2021-12-04 13:40:26 -08:00
Harlow Ward
3b3b252fa5 Add all examples to example directory 2021-12-04 10:42:17 -08:00
Harlow Ward
61fa84eca6
Update to use aws-sdk-go-v2 (#141) 2021-09-21 22:00:14 -07:00
Harlow Ward
8257129066
Remove note about consumer groups (#139) 2021-09-02 11:20:14 -07:00
jonandrewj
a334486111
Implement a WithShardClosedHandler option for the consumer (#135) 2021-07-30 14:16:15 -07:00
jonandrewj
c75a9237b6
Add InternalFailureException to the list of retriable errors (#132)
* Add InternalFailureException to the list of retriable errors
2021-02-25 14:54:48 -08:00
Frank Meyer
27055f2ace
Wrap underlying errors via %w verb (#130)
As introduced in Go 1.13. This enables user of this library
to check for an underlying wrapped error type via errors.Is and
errors.As functions.
2021-02-05 09:10:34 -08:00
James Regovich
799ccf2d40
Add support for aggregated records (#127)
Add config option for aggregated records and deaggregation on records in ScanShard

This PR adds an option to consume aggregated records.
2020-10-13 20:41:18 -07:00
Jason Tackaberry
e60d217333
Include MillisBehindLatest in Record for ScanFunc (#124) 2020-08-01 22:05:17 -07:00
Jason Tackaberry
3f2519e51c
Run initial scan immediately (#123)
Rather than starting the shard scan loop and waiting for next scan tick before
fetching any data, do the first poll immediately, and then wait.
2020-08-01 15:45:37 -07:00
Jason Tackaberry
97ffabeaa5
Include ShardID in Record passed to ScanFunc (#121)
* Include ShardID in Record passed to ScanFunc
* Update mock to explicitly use kinesis.Record

Supports change wherein consumer.Record is changed from an alias of
kinesis.Record to a composition containing it.
2020-07-30 14:18:38 -07:00
Harlow Ward
bae065cf53 Make sure gofmt run on all files 2020-07-21 20:31:38 -07:00
Harlow Ward
3b95644d77 Move license to file without prefix 2020-07-21 20:31:04 -07:00
Nicolas Augusto Sassi
ef5ce02f91
Update README.md (#119)
fix typo
2020-07-21 20:27:21 -07:00
Harlow Ward
89db667ce5
Bump AWS SDK to v1.33.3 (#118)
* Bump AWS SDK to v1.33.3
* Bump redis version
2020-07-21 20:27:03 -07:00
chumbert2
400ef07463
Allow to override Limit parameter in GetRecords (#113)
Add option maxRecords to set the maximum number of records that can
be returned by GetRecords. Default value: 10,000. Use WithMaxRecords
to change the default value.

See

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#GetRecordsInput
https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
2020-04-27 21:12:20 -07:00
Jason Pepper
dcd9d048fb
Don't send StreamName when calling ListShards with NextToken. (#110)
Co-authored-by: Jason Pepper <jason.pepper@crypsisgroup.com>
2020-03-19 21:45:53 -07:00
0livd
bc5c012fd3 Add scanInterval option (#105)
By default a consumer will scan for records every 250ms.
This interval can be configured with WithScanInterval.
2020-01-17 10:22:10 -08:00
Patrik Karlström
217999854b Only initialize ddb client if none is provided (#106) 2020-01-17 10:02:37 -08:00
Andrew Shannon Brown
f85f25c15e Add an in-memory checkpoint to the API (#103)
* Add an in-memory checkpoint to the API

* Rename memory to store

* Rename test package to store
2019-09-08 13:13:04 -07:00
Harlow Ward
b87510458e
Fix example command indentation 2019-09-02 08:13:08 -07:00
Harlow Ward
dfb0faaa87 Add missing code-block highlighting 2019-09-02 08:11:40 -07:00
Harlow Ward
b451fc4cc2 Use stdin for example data reader instead of file path 2019-09-02 08:08:21 -07:00
Harlow Ward
e3ee95b282 Fix typo in context cancellation docs 2019-09-02 07:47:10 -07:00
Harlow Ward
4c2aaf78a2 Add consumer example without checkpointing 2019-09-02 07:44:26 -07:00
Harlow Ward
3ae979bf82 Move example consumers under cmd directory 2019-09-02 07:36:31 -07:00
Harlow Ward
1a141cfbaa
Move notice above the build status 2019-09-02 07:30:33 -07:00
Harlow Ward
3a98baa012 Update godoc for allgroup description 2019-09-02 07:26:44 -07:00
Harlow Ward
ed40b5d9b4 Default to kinesalite when running example consumers 2019-09-02 07:25:07 -07:00
Dimas Yudha P
a252eb38c6 Readme and linter fix (#102)
* update readme.md adding goreport

* update readme and fix issue found by linter

* update readme.md add Go badges

* update readme.md, fix go badges
2019-08-31 18:43:26 -07:00
Matias Morán Losada
71bbc397e2 An attempt to work around a gopkg.in/module error in a dependency (#100)
* An attempt to work around a gopkg.in/module error in a dependency

* replace go-sqlmock module to new source in tests
2019-08-31 13:19:23 -07:00
Andrew Shannon Brown
14db23eaf3 Support creating an iterator with an initial timestamp (#99)
* Allow setting initial timestamp

* Fix writing to closed channel

* Allow cancelling of request
2019-08-14 09:33:35 -07:00
Patrick Moore
81a8ac4221 Allow use of existing Redis client for checkpoint store (#96) 2019-08-05 15:04:27 -07:00
Harlow Ward
35c48ef1c9
Only retry expired shard iterator errors (#95)
Fixes https://github.com/harlow/kinesis-consumer/issues/92
2019-07-30 19:48:20 -07:00
Harlow Ward
5da0865ac1 Add WithGroup option 2019-07-28 21:34:54 -07:00
Harlow Ward
a9c97d3b93 Update examples to use Store interface 2019-07-28 21:33:19 -07:00
Harlow Ward
d2cf65fa7a Update Redis library version
The Redis library was pinned to an older vesion using gopkg.in.
This updates to latest version and pins w/ go mod.

https://github.com/harlow/kinesis-consumer/issues/93
2019-07-28 21:27:28 -07:00
Harlow Ward
00b5f64fa7 Clean up storage w/ store for consistency 2019-07-28 21:20:29 -07:00
Harlow Ward
c72f561abd
Replace Checkpoint with Store interface (#90)
As we work towards introducing consumer groups to the repository we need a more generic name for the persistence layer for storing checkpoints and leases for given shards. 

* Rename `checkpoint` to `store`
2019-07-28 21:18:40 -07:00
Harlow Ward
d05d6c2d5e Update comments for exported functions 2019-07-28 10:54:01 -07:00
Harlow Ward
7018c0c47e
Introduce Group interface and AllGroup (#91)
* Introduce Group interface and AllGroup

As we move towards consumer groups we'll need to support the current
"consume all shards" strategy, and setup the codebase for the
anticipated "consume balanced shards."
2019-06-09 13:42:25 -07:00