2017-11-20 16:21:40 +00:00
# Golang Kinesis Consumer
2014-07-25 06:03:41 +00:00
2019-02-18 19:05:01 +00:00
2019-09-02 14:30:33 +00:00
__Note:__ This repo is under active development adding [Consumer Groups #42 ](https://github.com/harlow/kinesis-consumer/issues/42 ). Master should always be deployable, but there may be interface changes in master over the next few months.
2019-05-29 02:52:32 +00:00
Latest stable release https://github.com/harlow/kinesis-consumer/releases/tag/v0.3.2
2019-09-02 14:30:33 +00:00
 [](https://travis-ci.com/harlow/kinesis-consumer) [](https://godoc.org/github.com/harlow/kinesis-consumer) [](https://goreportcard.com/report/harlow/kinesis-consumer)
2017-11-22 18:55:22 +00:00
Kinesis consumer applications written in Go. This library is intended to be a lightweight wrapper around the Kinesis API to read records, save checkpoints (with swappable backends), and gracefully recover from service timeouts/errors.
2017-11-20 17:37:30 +00:00
2017-11-22 18:57:29 +00:00
__Alternate serverless options:__
2016-02-03 05:04:22 +00:00
2017-11-22 22:09:22 +00:00
* [Kinesis to Firehose ](http://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html ) can be used to archive data directly to S3, Redshift, or Elasticsearch without running a consumer application.
2017-11-22 18:55:22 +00:00
2018-05-24 15:34:10 +00:00
* [Process Kinesis Streams with Golang and AWS Lambda ](https://medium.com/@harlow/processing-kinesis-streams-w-aws-lambda-and-golang-264efc8f979a ) for serverless processing and checkpoint management.
2017-11-20 18:29:30 +00:00
2017-11-20 16:21:40 +00:00
## Installation
2015-05-23 15:52:08 +00:00
2017-11-20 16:21:40 +00:00
Get the package source:
$ go get github.com/harlow/kinesis-consumer
2015-05-23 23:18:10 +00:00
2014-07-25 06:03:41 +00:00
## Overview
2014-07-25 06:03:41 +00:00
2017-11-20 16:21:40 +00:00
The consumer leverages a handler func that accepts a Kinesis record. The `Scan` method will consume all shards concurrently and call the callback func as it receives records from the stream.
2016-02-03 05:04:22 +00:00
2020-07-22 03:27:21 +00:00
_Important 1: The `Scan` func will also poll the stream to check for new shards, it will automatically start consuming new shards added to the stream._
2019-04-10 05:03:12 +00:00
_Important 2: The default Log, Counter, and Checkpoint are no-op which means no logs, counts, or checkpoints will be emitted when scanning the stream. See the options below to override these defaults._
2017-11-23 01:44:42 +00:00
2016-02-09 03:42:26 +00:00
```go
2017-11-21 16:58:16 +00:00
import(
// ...
2017-11-23 01:44:42 +00:00
2017-11-21 16:58:16 +00:00
consumer "github.com/harlow/kinesis-consumer"
)
2017-11-20 16:21:40 +00:00
2016-02-03 05:04:22 +00:00
func main() {
2017-11-23 04:01:31 +00:00
var stream = flag.String("stream", "", "Stream name")
2017-11-20 16:21:40 +00:00
flag.Parse()
2017-11-23 01:44:42 +00:00
// consumer
2017-11-23 04:01:31 +00:00
c, err := consumer.New(*stream)
2017-11-20 16:21:40 +00:00
if err != nil {
2017-11-20 19:45:30 +00:00
log.Fatalf("consumer error: %v", err)
2017-11-20 16:21:40 +00:00
}
2018-07-29 05:53:33 +00:00
// start scan
2019-04-07 23:29:12 +00:00
err = c.Scan(context.TODO(), func(r *consumer.Record) error {
2017-11-20 16:21:40 +00:00
fmt.Println(string(r.Data))
2019-04-07 23:29:12 +00:00
return nil // continue scanning
2017-11-20 16:21:40 +00:00
})
2017-11-20 19:45:30 +00:00
if err != nil {
log.Fatalf("scan error: %v", err)
}
2019-04-07 23:29:12 +00:00
// Note: If you need to aggregate based on a specific shard
// the `ScanShard` function should be used instead.
2016-02-03 05:04:22 +00:00
}
```
2014-07-25 06:03:41 +00:00
2019-04-07 23:29:12 +00:00
## ScanFunc
2018-07-29 05:53:33 +00:00
2019-04-07 23:29:12 +00:00
ScanFunc is the type of the function called for each message read
from the stream. The record argument contains the original record
returned from the AWS Kinesis library.
2018-07-29 05:53:33 +00:00
```go
2019-04-07 23:29:12 +00:00
type ScanFunc func(r *Record) error
```
2018-07-29 05:53:33 +00:00
2019-04-07 23:29:12 +00:00
If an error is returned, scanning stops. The sole exception is when the
function returns the special value SkipCheckpoint.
```go
// continue scanning
return nil
2018-07-29 05:53:33 +00:00
2019-04-07 23:29:12 +00:00
// continue scanning, skip checkpoint
return consumer.SkipCheckpoint
2018-07-29 05:53:33 +00:00
// stop scanning, return error
2019-04-07 23:29:12 +00:00
return errors.New("my error, exit all scans")
```
2019-09-02 14:47:10 +00:00
Use context cancel to signal the scan to exit without error. For example if we wanted to gracefully exit the scan on interrupt.
2019-04-07 23:29:12 +00:00
```go
// trap SIGINT, wait to trigger shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// context with cancel
ctx, cancel := context.WithCancel(context.Background())
go func() {
< -signals
cancel() // call cancellation
}()
err := c.Scan(ctx, func(r *consumer.Record) error {
fmt.Println(string(r.Data))
return nil // continue scanning
})
2018-07-29 05:53:33 +00:00
```
2019-07-29 04:18:40 +00:00
## Options
The consumer allows the following optional overrides.
2019-07-29 04:20:29 +00:00
### Store
2016-12-04 08:08:06 +00:00
2019-07-29 04:18:40 +00:00
To record the progress of the consumer in the stream (checkpoint) we use a storage layer to persist the last sequence number the consumer has read from a particular shard. The boolean value ErrSkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback.
2016-12-04 08:08:06 +00:00
2017-11-21 16:58:16 +00:00
This will allow consumers to re-launch and pick up at the position in the stream where they left off.
2017-11-20 16:21:40 +00:00
2017-11-21 16:58:16 +00:00
The uniq identifier for a consumer is `[appName, streamName, shardID]`
2017-11-20 16:21:40 +00:00
2017-11-21 17:04:39 +00:00
< img width = "722" alt = "kinesis-checkpoints" src = "https://user-images.githubusercontent.com/739782/33085867-d8336122-ce9a-11e7-8c8a-a8afeb09dff1.png" >
2017-11-20 16:21:40 +00:00
2019-07-29 04:18:40 +00:00
Note: The default storage is in-memory (no-op). Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started.
2019-07-29 04:20:29 +00:00
The consumer accpets a `WithStore` option to set the storage layer:
2019-07-29 04:18:40 +00:00
```go
2019-07-29 04:20:29 +00:00
c, err := consumer.New(*stream, consumer.WithStore(db))
2019-07-29 04:18:40 +00:00
if err != nil {
log.Log("consumer error: %v", err)
}
```
2017-11-23 01:44:42 +00:00
2019-07-29 04:18:40 +00:00
To persist scan progress choose one of the following storage layers:
2017-11-20 16:21:40 +00:00
2019-07-29 04:18:40 +00:00
#### Redis
2017-11-20 19:06:46 +00:00
2017-11-21 16:58:16 +00:00
The Redis checkpoint requries App Name, and Stream Name:
2017-11-20 19:06:46 +00:00
2017-11-21 16:58:16 +00:00
```go
2019-07-29 04:20:29 +00:00
import store "github.com/harlow/kinesis-consumer/store/redis"
2017-11-20 19:06:46 +00:00
2017-11-21 16:58:16 +00:00
// redis checkpoint
2019-07-29 04:20:29 +00:00
db, err := store.New(appName)
2017-11-21 16:58:16 +00:00
if err != nil {
log.Fatalf("new checkpoint error: %v", err)
}
2016-12-04 08:08:06 +00:00
```
2019-07-29 04:18:40 +00:00
#### DynamoDB
2017-11-20 17:55:43 +00:00
2017-11-21 16:58:16 +00:00
The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:
2017-11-20 17:37:30 +00:00
```go
2019-07-29 04:20:29 +00:00
import store "github.com/harlow/kinesis-consumer/store/ddb"
2017-11-21 16:58:16 +00:00
2017-11-20 17:55:43 +00:00
// ddb checkpoint
2019-07-29 04:20:29 +00:00
db, err := store.New(appName, tableName)
2017-11-20 17:37:30 +00:00
if err != nil {
log.Fatalf("new checkpoint error: %v", err)
}
2018-06-01 23:14:42 +00:00
// Override the Kinesis if any needs on session (e.g. assume role)
myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()))
// For versions of AWS sdk that fixed config being picked up properly, the example of
// setting region should work.
// myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()), & aws.Config{
// Region: aws.String("us-west-2"),
// })
2019-07-29 04:20:29 +00:00
db, err := store.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient))
2018-06-01 23:14:42 +00:00
if err != nil {
2018-07-29 05:53:33 +00:00
log.Fatalf("new checkpoint error: %v", err)
2018-06-01 23:14:42 +00:00
}
2018-07-29 05:53:33 +00:00
2018-06-08 04:10:28 +00:00
// Or we can provide your own Retryer to customize what triggers a retry inside checkpoint
// See code in examples
// ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(& MyRetryer{}))
2017-11-21 16:58:16 +00:00
```
To leverage the DDB checkpoint we'll also need to create a table:
2017-11-23 04:01:31 +00:00
```
Partition key: namespace
Sort key: shard_id
```
< img width = "727" alt = "screen shot 2017-11-22 at 7 59 36 pm" src = "https://user-images.githubusercontent.com/739782/33158557-b90e4228-cfbf-11e7-9a99-73b56a446f5f.png" >
2017-11-20 17:37:30 +00:00
2019-07-29 04:18:40 +00:00
#### Postgres
2018-06-18 02:27:10 +00:00
The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString:
```go
2019-07-29 04:20:29 +00:00
import store "github.com/harlow/kinesis-consumer/store/postgres"
2018-06-18 02:27:10 +00:00
// postgres checkpoint
2019-07-29 04:20:29 +00:00
db, err := store.New(app, table, connStr)
2018-06-18 02:27:10 +00:00
if err != nil {
2018-07-29 05:53:33 +00:00
log.Fatalf("new checkpoint error: %v", err)
2018-06-18 02:27:10 +00:00
}
```
To leverage the Postgres checkpoint we'll also need to create a table:
```sql
CREATE TABLE kinesis_consumer (
namespace text NOT NULL,
shard_id text NOT NULL,
sequence_number numeric NOT NULL,
CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id)
);
```
The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.
2019-04-13 05:15:49 +00:00
2019-07-29 04:18:40 +00:00
#### Mysql
2019-04-13 05:15:49 +00:00
The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!):
```go
2019-07-29 04:20:29 +00:00
import store "github.com/harlow/kinesis-consumer/store/mysql"
2019-04-13 05:15:49 +00:00
// mysql checkpoint
2019-07-29 04:20:29 +00:00
db, err := store.New(app, table, connStr)
2019-04-13 05:15:49 +00:00
if err != nil {
log.Fatalf("new checkpoint error: %v", err)
}
```
To leverage the Mysql checkpoint we'll also need to create a table:
```sql
CREATE TABLE kinesis_consumer (
namespace varchar(255) NOT NULL,
shard_id varchar(255) NOT NULL,
sequence_number numeric(65,0) NOT NULL,
CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id)
);
```
The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.
2018-06-18 02:27:10 +00:00
2018-07-29 05:53:33 +00:00
### Kinesis Client
2017-11-22 22:09:22 +00:00
Override the Kinesis client if there is any special config needed:
2017-11-21 16:58:16 +00:00
```go
2017-11-22 22:09:22 +00:00
// client
2019-04-07 23:29:12 +00:00
client := kinesis.New(session.NewSession(aws.NewConfig()))
2017-11-21 16:58:16 +00:00
2017-11-22 22:09:22 +00:00
// consumer
2017-11-23 04:01:31 +00:00
c, err := consumer.New(streamName, consumer.WithClient(client))
2017-11-20 17:37:30 +00:00
```
2017-11-20 16:21:40 +00:00
2017-11-22 22:09:22 +00:00
### Metrics
2016-05-01 19:20:44 +00:00
2017-11-22 22:09:22 +00:00
Add optional counter for exposing counts for checkpoints and records processed:
2016-05-01 19:20:44 +00:00
```go
2017-11-22 22:09:22 +00:00
// counter
counter := expvar.NewMap("counters")
// consumer
2017-11-23 04:01:31 +00:00
c, err := consumer.New(streamName, consumer.WithCounter(counter))
2017-11-22 22:09:22 +00:00
```
2016-05-01 19:20:44 +00:00
2017-11-22 22:09:22 +00:00
The [expvar package ](https://golang.org/pkg/expvar/ ) will display consumer counts:
2016-05-01 19:40:30 +00:00
2019-09-02 15:11:40 +00:00
```json
2017-11-22 22:09:22 +00:00
"counters": {
2019-07-29 04:20:29 +00:00
"checkpoints": 3,
"records": 13005
2017-11-22 22:09:22 +00:00
},
```
2019-04-10 05:10:07 +00:00
### Consumer starting point
Kinesis allows consumers to specify where on the stream they'd like to start consuming from. The default in this library is `LATEST` (Start reading just after the most recent record in the shard).
This can be adjusted by using the `WithShardIteratorType` option in the library:
```go
// override starting place on stream to use TRIM_HORIZON
c, err := consumer.New(
*stream,
consumer.WithShardIteratorType(kinesis.ShardIteratorTypeTrimHorizon)
)
```
[See AWS Docs for more options. ](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html )
2017-11-22 22:09:22 +00:00
### Logging
2018-07-29 05:53:33 +00:00
2018-06-13 01:07:33 +00:00
Logging supports the basic built-in logging library or use thrid party external one, so long as
it implements the Logger interface.
For example, to use the builtin logging package, we wrap it with myLogger structure.
2019-09-02 15:11:40 +00:00
```go
2018-06-13 01:07:33 +00:00
// A myLogger provides a minimalistic logger satisfying the Logger interface.
type myLogger struct {
2018-07-29 05:53:33 +00:00
logger *log.Logger
2018-06-13 01:07:33 +00:00
}
// Log logs the parameters to the stdlib logger. See log.Println.
func (l *myLogger) Log(args ...interface{}) {
2018-07-29 05:53:33 +00:00
l.logger.Println(args...)
2018-06-13 01:07:33 +00:00
}
```
2017-11-22 22:09:22 +00:00
The package defaults to `ioutil.Discard` so swallow all logs. This can be customized with the preferred logging strategy:
```go
// logger
2019-04-10 05:03:12 +00:00
logger := & myLogger{
logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags),
2018-07-29 05:53:33 +00:00
}
2017-11-22 22:09:22 +00:00
// consumer
2017-11-23 04:01:31 +00:00
c, err := consumer.New(streamName, consumer.WithLogger(logger))
2016-05-01 19:40:30 +00:00
```
2018-07-29 05:53:33 +00:00
2018-06-13 01:07:33 +00:00
To use a more complicated logging library, e.g. apex log
2018-07-29 05:53:33 +00:00
2019-09-02 15:11:40 +00:00
```go
2018-06-13 01:07:33 +00:00
type myLogger struct {
2018-07-29 05:53:33 +00:00
logger *log.Logger
2018-06-13 01:07:33 +00:00
}
func (l *myLogger) Log(args ...interface{}) {
2018-07-29 05:53:33 +00:00
l.logger.Infof("producer", args...)
2018-06-13 01:07:33 +00:00
}
func main() {
2018-07-29 05:53:33 +00:00
log := & myLogger{
logger: alog.Logger{
Handler: text.New(os.Stderr),
Level: alog.DebugLevel,
},
}
2018-06-13 01:07:33 +00:00
```
2016-05-01 19:40:30 +00:00
2019-09-02 14:36:14 +00:00
# Examples
There are example Produder and Consumer code in `/cmd` directory. These should help give end-to-end examples of setting up consumers with different checkpoint strategies.
The examples run locally against [Kinesis Lite ](https://github.com/mhart/kinesalite ).
2019-09-02 15:13:08 +00:00
$ kinesalite &
2019-09-02 15:08:21 +00:00
Produce data to the stream:
2019-09-02 15:13:08 +00:00
$ cat cmd/producer/users.txt | go run cmd/producer/main.go --stream myStream
2019-09-02 15:08:21 +00:00
Consume data from the stream:
2019-09-02 15:13:08 +00:00
$ go run cmd/consumer/main.go --stream myStream
2019-09-02 14:36:14 +00:00
2015-05-23 23:18:10 +00:00
## Contributing
2015-05-23 15:52:08 +00:00
2015-05-23 22:22:58 +00:00
Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]!
2015-05-23 15:52:08 +00:00
[LICENSE]: /MIT-LICENSE
[CONTRIBUTING.md]: /CONTRIBUTING.md
2015-05-23 23:18:10 +00:00
## License
2015-05-23 15:52:08 +00:00
Copyright (c) 2015 Harlow Ward. It is free software, and may
be redistributed under the terms specified in the [LICENSE] file.
[contributors]: https://github.com/harlow/kinesis-connectors/graphs/contributors
2016-05-01 19:45:27 +00:00
> [www.hward.com](http://www.hward.com) ·
> GitHub [@harlow](https://github.com/harlow) ·
> Twitter [@harlow_ward](https://twitter.com/harlow_ward)