141 lines
3.8 KiB
Markdown
141 lines
3.8 KiB
Markdown
# Golang Kinesis Consumer
|
|
|
|
Kinesis consumer applications written in Go. This library is intended to be a lightweight wrapper around the Kinesis API to scan records, set checkpoints, and gracefully recover from network errors.
|
|
|
|
_NOTE: With the release of [Kinesis to Firehose](http://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html) it's possible to archive data directly to S3, Redshift, or Elasticsearch without running a consumer application._
|
|
|
|
## Installation
|
|
|
|
Get the package source:
|
|
|
|
$ go get github.com/harlow/kinesis-consumer
|
|
|
|
## Overview
|
|
|
|
The consumer leverages a handler func that accepts a Kinesis record. The `Scan` method will consume all shards concurrently and call the callback func as it receives records from the stream.
|
|
|
|
```go
|
|
import consumer "github.com/harlow/kinesis-consumer"
|
|
|
|
func main() {
|
|
log.SetHandler(text.New(os.Stderr))
|
|
log.SetLevel(log.DebugLevel)
|
|
|
|
var (
|
|
app = flag.String("app", "", "App name") // name of consumer group
|
|
stream = flag.String("stream", "", "Stream name")
|
|
)
|
|
flag.Parse()
|
|
|
|
c, err := consumer.New(*app, *stream)
|
|
if err != nil {
|
|
log.Fatalf("new consumer error: %v", err)
|
|
}
|
|
|
|
c.Scan(context.TODO(), func(r *kinesis.Record) bool {
|
|
fmt.Println(string(r.Data))
|
|
|
|
return true // continue scanning
|
|
})
|
|
}
|
|
```
|
|
|
|
Note: If you need to aggregate based on a specific shard the `ScanShard` method should be leverged instead.
|
|
|
|
### Configuration
|
|
|
|
The consumer requires the following config:
|
|
|
|
* App Name (used for checkpoints)
|
|
* Stream Name (kinesis stream name)
|
|
|
|
It also accepts the following optional overrides:
|
|
|
|
* Kinesis Client
|
|
* Logger
|
|
* Checkpoint
|
|
|
|
```go
|
|
// new kinesis client
|
|
svc := kinesis.New(session.New(aws.NewConfig()))
|
|
|
|
// new consumer with custom client
|
|
c, err := consumer.New(
|
|
appName,
|
|
streamName,
|
|
consumer.WithClient(svc),
|
|
)
|
|
```
|
|
|
|
### Checkpoint
|
|
|
|
The default checkpoint uses Redis on localhost; to set a custom Redis URL use ENV vars:
|
|
|
|
```
|
|
REDIS_URL=redis.yoursite.com:6379
|
|
```
|
|
|
|
To leverage DynamoDB as the backend for checkpoint we'll need a new table:
|
|
|
|
<img width="659" alt="screen shot 2017-11-20 at 9 16 14 am" src="https://user-images.githubusercontent.com/739782/33033316-db85f848-cdd8-11e7-941a-0a87d8ace479.png">
|
|
|
|
Then override the checkpoint config option:
|
|
|
|
```go
|
|
// ddb checkpoint
|
|
ck, err := checkpoint.New(tableName, appName, streamName)
|
|
if err != nil {
|
|
log.Fatalf("new checkpoint error: %v", err)
|
|
}
|
|
|
|
// consumer with checkpoint
|
|
c, err := consumer.New(
|
|
appName,
|
|
streamName,
|
|
consumer.WithCheckpoint(ck),
|
|
)
|
|
```
|
|
|
|
### Logging
|
|
|
|
[Apex Log](https://medium.com/@tjholowaychuk/apex-log-e8d9627f4a9a#.5x1uo1767) is used for logging Info. Override the logs format with other [Log Handlers](https://github.com/apex/log/tree/master/_examples). For example using the "json" log handler:
|
|
|
|
```go
|
|
import(
|
|
"github.com/apex/log"
|
|
"github.com/apex/log/handlers/json"
|
|
)
|
|
|
|
func main() {
|
|
// ...
|
|
|
|
log.SetHandler(json.New(os.Stderr))
|
|
log.SetLevel(log.DebugLevel)
|
|
}
|
|
```
|
|
|
|
Which will producde the following logs:
|
|
|
|
```
|
|
INFO[0000] processing app=test shard=shardId-000000000000 stream=test
|
|
INFO[0008] checkpoint app=test shard=shardId-000000000000 stream=test
|
|
INFO[0012] checkpoint app=test shard=shardId-000000000000 stream=test
|
|
```
|
|
|
|
## Contributing
|
|
|
|
Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]!
|
|
|
|
[LICENSE]: /MIT-LICENSE
|
|
[CONTRIBUTING.md]: /CONTRIBUTING.md
|
|
|
|
## License
|
|
|
|
Copyright (c) 2015 Harlow Ward. It is free software, and may
|
|
be redistributed under the terms specified in the [LICENSE] file.
|
|
|
|
[contributors]: https://github.com/harlow/kinesis-connectors/graphs/contributors
|
|
|
|
> [www.hward.com](http://www.hward.com) ·
|
|
> GitHub [@harlow](https://github.com/harlow) ·
|
|
> Twitter [@harlow_ward](https://twitter.com/harlow_ward)
|