2017-11-20 16:21:40 +00:00
# Golang Kinesis Consumer
2014-07-25 06:03:41 +00:00
2019-02-18 19:10:02 +00:00
[](https://travis-ci.com/harlow/kinesis-consumer) [](https://godoc.org/github.com/harlow/kinesis-consumer)
2019-02-18 19:05:01 +00:00
2019-05-29 02:52:32 +00:00
__Note:__ This repo is under active development adding [Consumer Groups #42 ](https://github.com/harlow/kinesis-consumer/issues/42 ). Master should always be deployable, but expect breaking changes in master over the next few months.
Latest stable release https://github.com/harlow/kinesis-consumer/releases/tag/v0.3.2
2017-11-22 18:55:22 +00:00
Kinesis consumer applications written in Go. This library is intended to be a lightweight wrapper around the Kinesis API to read records, save checkpoints (with swappable backends), and gracefully recover from service timeouts/errors.
2017-11-20 17:37:30 +00:00
2017-11-22 18:57:29 +00:00
__Alternate serverless options:__
2016-02-03 05:04:22 +00:00
2017-11-22 22:09:22 +00:00
* [Kinesis to Firehose ](http://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html ) can be used to archive data directly to S3, Redshift, or Elasticsearch without running a consumer application.
2017-11-22 18:55:22 +00:00
2018-05-24 15:34:10 +00:00
* [Process Kinesis Streams with Golang and AWS Lambda ](https://medium.com/@harlow/processing-kinesis-streams-w-aws-lambda-and-golang-264efc8f979a ) for serverless processing and checkpoint management.
2017-11-20 18:29:30 +00:00
2017-11-20 16:21:40 +00:00
## Installation
2015-05-23 15:52:08 +00:00
2017-11-20 16:21:40 +00:00
Get the package source:
$ go get github.com/harlow/kinesis-consumer
2015-05-23 23:18:10 +00:00
2014-07-25 06:03:41 +00:00
## Overview
2014-07-25 06:03:41 +00:00
2017-11-20 16:21:40 +00:00
The consumer leverages a handler func that accepts a Kinesis record. The `Scan` method will consume all shards concurrently and call the callback func as it receives records from the stream.
2016-02-03 05:04:22 +00:00
2019-04-10 05:03:12 +00:00
_Important 1: The `Scan` func will also poll the stream to check for new shards, it will automatcially start consuming new shards added to the stream._
_Important 2: The default Log, Counter, and Checkpoint are no-op which means no logs, counts, or checkpoints will be emitted when scanning the stream. See the options below to override these defaults._
2017-11-23 01:44:42 +00:00
2016-02-09 03:42:26 +00:00
```go
2017-11-21 16:58:16 +00:00
import(
// ...
2017-11-23 01:44:42 +00:00
2017-11-21 16:58:16 +00:00
consumer "github.com/harlow/kinesis-consumer"
)
2017-11-20 16:21:40 +00:00
2016-02-03 05:04:22 +00:00
func main() {
2017-11-23 04:01:31 +00:00
var stream = flag.String("stream", "", "Stream name")
2017-11-20 16:21:40 +00:00
flag.Parse()
2017-11-23 01:44:42 +00:00
// consumer
2017-11-23 04:01:31 +00:00
c, err := consumer.New(*stream)
2017-11-20 16:21:40 +00:00
if err != nil {
2017-11-20 19:45:30 +00:00
log.Fatalf("consumer error: %v", err)
2017-11-20 16:21:40 +00:00
}
2018-07-29 05:53:33 +00:00
// start scan
2019-04-07 23:29:12 +00:00
err = c.Scan(context.TODO(), func(r *consumer.Record) error {
2017-11-20 16:21:40 +00:00
fmt.Println(string(r.Data))
2019-04-07 23:29:12 +00:00
return nil // continue scanning
2017-11-20 16:21:40 +00:00
})
2017-11-20 19:45:30 +00:00
if err != nil {
log.Fatalf("scan error: %v", err)
}
2019-04-07 23:29:12 +00:00
// Note: If you need to aggregate based on a specific shard
// the `ScanShard` function should be used instead.
2016-02-03 05:04:22 +00:00
}
```
2014-07-25 06:03:41 +00:00
2019-04-07 23:29:12 +00:00
## ScanFunc
2018-07-29 05:53:33 +00:00
2019-04-07 23:29:12 +00:00
ScanFunc is the type of the function called for each message read
from the stream. The record argument contains the original record
returned from the AWS Kinesis library.
2018-07-29 05:53:33 +00:00
```go
2019-04-07 23:29:12 +00:00
type ScanFunc func(r *Record) error
```
2018-07-29 05:53:33 +00:00
2019-04-07 23:29:12 +00:00
If an error is returned, scanning stops. The sole exception is when the
function returns the special value SkipCheckpoint.
```go
// continue scanning
return nil
2018-07-29 05:53:33 +00:00
2019-04-07 23:29:12 +00:00
// continue scanning, skip checkpoint
return consumer.SkipCheckpoint
2018-07-29 05:53:33 +00:00
// stop scanning, return error
2019-04-07 23:29:12 +00:00
return errors.New("my error, exit all scans")
```
Use context cancel to signal the scan to exit without error. For example if we wanted to gracefulloy exit the scan on interrupt.
```go
// trap SIGINT, wait to trigger shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// context with cancel
ctx, cancel := context.WithCancel(context.Background())
go func() {
< -signals
cancel() // call cancellation
}()
err := c.Scan(ctx, func(r *consumer.Record) error {
fmt.Println(string(r.Data))
return nil // continue scanning
})
2018-07-29 05:53:33 +00:00
```
2017-11-22 18:46:39 +00:00
## Checkpoint
2016-12-04 08:08:06 +00:00
2018-06-08 15:40:42 +00:00
To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. The boolean value SkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback.
2016-12-04 08:08:06 +00:00
2017-11-21 16:58:16 +00:00
This will allow consumers to re-launch and pick up at the position in the stream where they left off.
2017-11-20 16:21:40 +00:00
2017-11-21 16:58:16 +00:00
The uniq identifier for a consumer is `[appName, streamName, shardID]`
2017-11-20 16:21:40 +00:00
2017-11-21 17:04:39 +00:00
< img width = "722" alt = "kinesis-checkpoints" src = "https://user-images.githubusercontent.com/739782/33085867-d8336122-ce9a-11e7-8c8a-a8afeb09dff1.png" >
2017-11-20 16:21:40 +00:00
2017-11-23 01:44:42 +00:00
Note: The default checkpoint is no-op. Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started.
To persist scan progress choose one of the following checkpoints:
2017-11-20 16:21:40 +00:00
2017-11-22 18:46:39 +00:00
### Redis Checkpoint
2017-11-20 19:06:46 +00:00
2017-11-21 16:58:16 +00:00
The Redis checkpoint requries App Name, and Stream Name:
2017-11-20 19:06:46 +00:00
2017-11-21 16:58:16 +00:00
```go
import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis"
2017-11-20 19:06:46 +00:00
2017-11-21 16:58:16 +00:00
// redis checkpoint
2017-11-23 04:01:31 +00:00
ck, err := checkpoint.New(appName)
2017-11-21 16:58:16 +00:00
if err != nil {
log.Fatalf("new checkpoint error: %v", err)
}
2016-12-04 08:08:06 +00:00
```
2017-11-22 18:46:39 +00:00
### DynamoDB Checkpoint
2017-11-20 17:55:43 +00:00
2017-11-21 16:58:16 +00:00
The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:
2017-11-20 17:37:30 +00:00
```go
2017-11-21 16:58:16 +00:00
import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
2017-11-20 17:55:43 +00:00
// ddb checkpoint
2017-12-31 04:21:10 +00:00
ck, err := checkpoint.New(appName, tableName)
2017-11-20 17:37:30 +00:00
if err != nil {
log.Fatalf("new checkpoint error: %v", err)
}
2018-06-01 23:14:42 +00:00
// Override the Kinesis if any needs on session (e.g. assume role)
myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()))
// For versions of AWS sdk that fixed config being picked up properly, the example of
// setting region should work.
// myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()), & aws.Config{
// Region: aws.String("us-west-2"),
// })
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient))
if err != nil {
2018-07-29 05:53:33 +00:00
log.Fatalf("new checkpoint error: %v", err)
2018-06-01 23:14:42 +00:00
}
2018-07-29 05:53:33 +00:00
2018-06-08 04:10:28 +00:00
// Or we can provide your own Retryer to customize what triggers a retry inside checkpoint
// See code in examples
// ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(& MyRetryer{}))
2017-11-21 16:58:16 +00:00
```
To leverage the DDB checkpoint we'll also need to create a table:
2017-11-23 04:01:31 +00:00
```
Partition key: namespace
Sort key: shard_id
```
< img width = "727" alt = "screen shot 2017-11-22 at 7 59 36 pm" src = "https://user-images.githubusercontent.com/739782/33158557-b90e4228-cfbf-11e7-9a99-73b56a446f5f.png" >
2017-11-20 17:37:30 +00:00
2018-06-18 02:27:10 +00:00
### Postgres Checkpoint
The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString:
```go
import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres"
// postgres checkpoint
ck, err := checkpoint.New(app, table, connStr)
if err != nil {
2018-07-29 05:53:33 +00:00
log.Fatalf("new checkpoint error: %v", err)
2018-06-18 02:27:10 +00:00
}
```
To leverage the Postgres checkpoint we'll also need to create a table:
```sql
CREATE TABLE kinesis_consumer (
namespace text NOT NULL,
shard_id text NOT NULL,
sequence_number numeric NOT NULL,
CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id)
);
```
The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.
2019-04-13 05:15:49 +00:00
### Mysql Checkpoint
The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!):
```go
import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/mysql"
// mysql checkpoint
ck, err := checkpoint.New(app, table, connStr)
if err != nil {
log.Fatalf("new checkpoint error: %v", err)
}
```
To leverage the Mysql checkpoint we'll also need to create a table:
```sql
CREATE TABLE kinesis_consumer (
namespace varchar(255) NOT NULL,
shard_id varchar(255) NOT NULL,
sequence_number numeric(65,0) NOT NULL,
CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id)
);
```
The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.
2018-06-18 02:27:10 +00:00
2017-11-22 18:46:39 +00:00
## Options
2017-11-21 16:58:16 +00:00
2017-11-22 22:09:22 +00:00
The consumer allows the following optional overrides.
2017-11-21 16:58:16 +00:00
2018-07-29 05:53:33 +00:00
### Kinesis Client
2017-11-22 22:09:22 +00:00
Override the Kinesis client if there is any special config needed:
2017-11-21 16:58:16 +00:00
```go
2017-11-22 22:09:22 +00:00
// client
2019-04-07 23:29:12 +00:00
client := kinesis.New(session.NewSession(aws.NewConfig()))
2017-11-21 16:58:16 +00:00
2017-11-22 22:09:22 +00:00
// consumer
2017-11-23 04:01:31 +00:00
c, err := consumer.New(streamName, consumer.WithClient(client))
2017-11-20 17:37:30 +00:00
```
2017-11-20 16:21:40 +00:00
2017-11-22 22:09:22 +00:00
### Metrics
2016-05-01 19:20:44 +00:00
2017-11-22 22:09:22 +00:00
Add optional counter for exposing counts for checkpoints and records processed:
2016-05-01 19:20:44 +00:00
```go
2017-11-22 22:09:22 +00:00
// counter
counter := expvar.NewMap("counters")
// consumer
2017-11-23 04:01:31 +00:00
c, err := consumer.New(streamName, consumer.WithCounter(counter))
2017-11-22 22:09:22 +00:00
```
2016-05-01 19:20:44 +00:00
2017-11-22 22:09:22 +00:00
The [expvar package ](https://golang.org/pkg/expvar/ ) will display consumer counts:
2016-05-01 19:40:30 +00:00
2017-11-22 22:09:22 +00:00
```
"counters": {
"checkpoints": 3,
"records": 13005
},
```
2019-04-10 05:10:07 +00:00
### Consumer starting point
Kinesis allows consumers to specify where on the stream they'd like to start consuming from. The default in this library is `LATEST` (Start reading just after the most recent record in the shard).
This can be adjusted by using the `WithShardIteratorType` option in the library:
```go
// override starting place on stream to use TRIM_HORIZON
c, err := consumer.New(
*stream,
consumer.WithShardIteratorType(kinesis.ShardIteratorTypeTrimHorizon)
)
```
[See AWS Docs for more options. ](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html )
2017-11-22 22:09:22 +00:00
### Logging
2018-07-29 05:53:33 +00:00
2018-06-13 01:07:33 +00:00
Logging supports the basic built-in logging library or use thrid party external one, so long as
it implements the Logger interface.
For example, to use the builtin logging package, we wrap it with myLogger structure.
```
// A myLogger provides a minimalistic logger satisfying the Logger interface.
type myLogger struct {
2018-07-29 05:53:33 +00:00
logger *log.Logger
2018-06-13 01:07:33 +00:00
}
// Log logs the parameters to the stdlib logger. See log.Println.
func (l *myLogger) Log(args ...interface{}) {
2018-07-29 05:53:33 +00:00
l.logger.Println(args...)
2018-06-13 01:07:33 +00:00
}
```
2017-11-22 22:09:22 +00:00
The package defaults to `ioutil.Discard` so swallow all logs. This can be customized with the preferred logging strategy:
```go
// logger
2019-04-10 05:03:12 +00:00
logger := & myLogger{
logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags),
2018-07-29 05:53:33 +00:00
}
2017-11-22 22:09:22 +00:00
// consumer
2017-11-23 04:01:31 +00:00
c, err := consumer.New(streamName, consumer.WithLogger(logger))
2016-05-01 19:40:30 +00:00
```
2018-07-29 05:53:33 +00:00
2018-06-13 01:07:33 +00:00
To use a more complicated logging library, e.g. apex log
2018-07-29 05:53:33 +00:00
2018-06-13 01:07:33 +00:00
```
type myLogger struct {
2018-07-29 05:53:33 +00:00
logger *log.Logger
2018-06-13 01:07:33 +00:00
}
func (l *myLogger) Log(args ...interface{}) {
2018-07-29 05:53:33 +00:00
l.logger.Infof("producer", args...)
2018-06-13 01:07:33 +00:00
}
func main() {
2018-07-29 05:53:33 +00:00
log := & myLogger{
logger: alog.Logger{
Handler: text.New(os.Stderr),
Level: alog.DebugLevel,
},
}
2018-06-13 01:07:33 +00:00
```
2016-05-01 19:40:30 +00:00
2015-05-23 23:18:10 +00:00
## Contributing
2015-05-23 15:52:08 +00:00
2015-05-23 22:22:58 +00:00
Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]!
2015-05-23 15:52:08 +00:00
[LICENSE]: /MIT-LICENSE
[CONTRIBUTING.md]: /CONTRIBUTING.md
2015-05-23 23:18:10 +00:00
## License
2015-05-23 15:52:08 +00:00
Copyright (c) 2015 Harlow Ward. It is free software, and may
be redistributed under the terms specified in the [LICENSE] file.
[contributors]: https://github.com/harlow/kinesis-connectors/graphs/contributors
2016-05-01 19:45:27 +00:00
> [www.hward.com](http://www.hward.com) ·
> GitHub [@harlow](https://github.com/harlow) ·
> Twitter [@harlow_ward](https://twitter.com/harlow_ward)