Default to kinesalite when running example consumers
This commit is contained in:
parent
a252eb38c6
commit
ed40b5d9b4
13 changed files with 132 additions and 84 deletions
7
examples/README.md
Normal file
7
examples/README.md
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
# Examples
|
||||
|
||||
Here are examples of Produder and Consumer code to provide guidance on how to use each of the different checkpoints.
|
||||
|
||||
The examples will run against [Kinesis Lite](https://github.com/mhart/kinesalite).
|
||||
|
||||
$ kinesalite
|
||||
|
|
@ -2,15 +2,6 @@
|
|||
|
||||
Read records from the Kinesis stream
|
||||
|
||||
### Environment Variables
|
||||
|
||||
Export the required environment vars for connecting to the Kinesis stream:
|
||||
|
||||
```
|
||||
export AWS_PROFILE=
|
||||
export AWS_REGION=
|
||||
```
|
||||
|
||||
### Run the consumer
|
||||
|
||||
$ go run main.go --app appName --stream streamName --table tableName
|
||||
|
|
|
|||
|
|
@ -57,18 +57,25 @@ func main() {
|
|||
app = flag.String("app", "", "Consumer app name")
|
||||
stream = flag.String("stream", "", "Stream name")
|
||||
table = flag.String("table", "", "Checkpoint table name")
|
||||
kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint")
|
||||
awsRegion = flag.String("region", "us-west-2", "AWS Region")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
// New Kinesis and DynamoDB clients (if you need custom config)
|
||||
sess, err := session.NewSession(aws.NewConfig())
|
||||
if err != nil {
|
||||
log.Log("new session error: %v", err)
|
||||
}
|
||||
|
||||
// New Kinesis and DynamoDB clients (if you need custom config)
|
||||
myKsis := kinesis.New(sess)
|
||||
myDdbClient := dynamodb.New(sess)
|
||||
|
||||
var myKsis = kinesis.New(session.Must(session.NewSession(
|
||||
aws.NewConfig().
|
||||
WithEndpoint(*kinesisEndpoint).
|
||||
WithRegion(*awsRegion).
|
||||
WithLogLevel(3),
|
||||
)))
|
||||
|
||||
// ddb persitance
|
||||
ddb, err := storage.New(*app, *table, storage.WithDynamoClient(myDdbClient), storage.WithRetryer(&MyRetryer{}))
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -2,15 +2,6 @@
|
|||
|
||||
Read records from the Kinesis stream using mysql as checkpoint
|
||||
|
||||
## Environment Variables
|
||||
|
||||
Export the required environment vars for connecting to the Kinesis stream:
|
||||
|
||||
```shell
|
||||
export AWS_PROFILE=
|
||||
export AWS_REGION=
|
||||
```
|
||||
|
||||
## Run the consumer
|
||||
|
||||
go run main.go --app <appName> --stream <streamName> --table <tableName> --connection <connectionString>
|
||||
|
|
|
|||
|
|
@ -9,8 +9,11 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
consumer "github.com/harlow/kinesis-consumer"
|
||||
checkpoint "github.com/harlow/kinesis-consumer/store/mysql"
|
||||
store "github.com/harlow/kinesis-consumer/store/mysql"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
@ -19,21 +22,32 @@ func main() {
|
|||
stream = flag.String("stream", "", "Stream name")
|
||||
table = flag.String("table", "", "Table name")
|
||||
connStr = flag.String("connection", "", "Connection Str")
|
||||
kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint")
|
||||
awsRegion = flag.String("region", "us-west-2", "AWS Region")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
// mysql checkpoint
|
||||
ck, err := checkpoint.New(*app, *table, *connStr)
|
||||
store, err := store.New(*app, *table, *connStr)
|
||||
if err != nil {
|
||||
log.Fatalf("checkpoint error: %v", err)
|
||||
}
|
||||
|
||||
var counter = expvar.NewMap("counters")
|
||||
|
||||
// client
|
||||
cfg := aws.NewConfig().
|
||||
WithEndpoint(*kinesisEndpoint).
|
||||
WithRegion(*awsRegion).
|
||||
WithLogLevel(3)
|
||||
|
||||
var client = kinesis.New(session.Must(session.NewSession(cfg)))
|
||||
|
||||
// consumer
|
||||
c, err := consumer.New(
|
||||
*stream,
|
||||
consumer.WithStore(ck),
|
||||
consumer.WithClient(client),
|
||||
consumer.WithStore(store),
|
||||
consumer.WithCounter(counter),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
@ -62,7 +76,7 @@ func main() {
|
|||
log.Fatalf("scan error: %v", err)
|
||||
}
|
||||
|
||||
if err := ck.Shutdown(); err != nil {
|
||||
log.Fatalf("checkpoint shutdown error: %v", err)
|
||||
if err := store.Shutdown(); err != nil {
|
||||
log.Fatalf("store shutdown error: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,15 +2,6 @@
|
|||
|
||||
Read records from the Kinesis stream using postgres as checkpoint
|
||||
|
||||
## Environment Variables
|
||||
|
||||
Export the required environment vars for connecting to the Kinesis stream:
|
||||
|
||||
```shell
|
||||
export AWS_PROFILE=
|
||||
export AWS_REGION=
|
||||
```
|
||||
|
||||
## Run the consumer
|
||||
|
||||
go run main.go --app appName --stream streamName --table tableName --connection connectionString
|
||||
|
|
|
|||
|
|
@ -9,8 +9,11 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
consumer "github.com/harlow/kinesis-consumer"
|
||||
checkpoint "github.com/harlow/kinesis-consumer/store/postgres"
|
||||
store "github.com/harlow/kinesis-consumer/store/postgres"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
@ -19,21 +22,32 @@ func main() {
|
|||
stream = flag.String("stream", "", "Stream name")
|
||||
table = flag.String("table", "", "Table name")
|
||||
connStr = flag.String("connection", "", "Connection Str")
|
||||
kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint")
|
||||
awsRegion = flag.String("region", "us-west-2", "AWS Region")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
// postgres checkpoint
|
||||
ck, err := checkpoint.New(*app, *table, *connStr)
|
||||
store, err := store.New(*app, *table, *connStr)
|
||||
if err != nil {
|
||||
log.Fatalf("checkpoint error: %v", err)
|
||||
}
|
||||
|
||||
var counter = expvar.NewMap("counters")
|
||||
|
||||
// client
|
||||
cfg := aws.NewConfig().
|
||||
WithEndpoint(*kinesisEndpoint).
|
||||
WithRegion(*awsRegion).
|
||||
WithLogLevel(3)
|
||||
|
||||
var client = kinesis.New(session.Must(session.NewSession(cfg)))
|
||||
|
||||
// consumer
|
||||
c, err := consumer.New(
|
||||
*stream,
|
||||
consumer.WithStore(ck),
|
||||
consumer.WithClient(client),
|
||||
consumer.WithStore(store),
|
||||
consumer.WithCounter(counter),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
@ -62,7 +76,7 @@ func main() {
|
|||
log.Fatalf("scan error: %v", err)
|
||||
}
|
||||
|
||||
if err := ck.Shutdown(); err != nil {
|
||||
log.Fatalf("checkpoint shutdown error: %v", err)
|
||||
if err := store.Shutdown(); err != nil {
|
||||
log.Fatalf("store shutdown error: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,16 +2,6 @@
|
|||
|
||||
Read records from the Kinesis stream
|
||||
|
||||
### Environment Variables
|
||||
|
||||
Export the required environment vars for connecting to the Kinesis stream and Redis for checkpoint:
|
||||
|
||||
```
|
||||
export AWS_PROFILE=
|
||||
export AWS_REGION=
|
||||
export REDIS_URL=
|
||||
```
|
||||
|
||||
### Run the consumer
|
||||
|
||||
$ go run main.go --app appName --stream streamName
|
||||
|
|
|
|||
|
|
@ -8,8 +8,11 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
consumer "github.com/harlow/kinesis-consumer"
|
||||
checkpoint "github.com/harlow/kinesis-consumer/store/redis"
|
||||
store "github.com/harlow/kinesis-consumer/store/redis"
|
||||
)
|
||||
|
||||
// A myLogger provides a minimalistic logger satisfying the Logger interface.
|
||||
|
|
@ -26,13 +29,15 @@ func main() {
|
|||
var (
|
||||
app = flag.String("app", "", "Consumer app name")
|
||||
stream = flag.String("stream", "", "Stream name")
|
||||
kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint")
|
||||
awsRegion = flag.String("region", "us-west-2", "AWS Region")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
// redis checkpoint
|
||||
ck, err := checkpoint.New(*app)
|
||||
// redis checkpoint store
|
||||
store, err := store.New(*app)
|
||||
if err != nil {
|
||||
log.Fatalf("checkpoint error: %v", err)
|
||||
log.Fatalf("store error: %v", err)
|
||||
}
|
||||
|
||||
// logger
|
||||
|
|
@ -40,10 +45,19 @@ func main() {
|
|||
logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags),
|
||||
}
|
||||
|
||||
// client
|
||||
cfg := aws.NewConfig().
|
||||
WithEndpoint(*kinesisEndpoint).
|
||||
WithRegion(*awsRegion).
|
||||
WithLogLevel(3)
|
||||
|
||||
var client = kinesis.New(session.Must(session.NewSession(cfg)))
|
||||
|
||||
// consumer
|
||||
c, err := consumer.New(
|
||||
*stream,
|
||||
consumer.WithStore(ck),
|
||||
consumer.WithClient(client),
|
||||
consumer.WithStore(store),
|
||||
consumer.WithLogger(logger),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -2,15 +2,6 @@
|
|||
|
||||
A prepopulated file with JSON users is available on S3 for seeing the stream.
|
||||
|
||||
### Environment Variables
|
||||
|
||||
Export the required environment vars for connecting to the Kinesis stream:
|
||||
|
||||
```
|
||||
export AWS_PROFILE=
|
||||
export AWS_REGION=
|
||||
```
|
||||
|
||||
### Running the code
|
||||
## Running the code
|
||||
|
||||
$ go run main.go --stream streamName
|
||||
|
|
|
|||
|
|
@ -13,8 +13,17 @@ import (
|
|||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
)
|
||||
|
||||
const (
|
||||
kinesisEndpoint = "http://localhost:4567"
|
||||
awsRegion = "us-west-2"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var streamName = flag.String("stream", "", "Stream name")
|
||||
var (
|
||||
streamName = flag.String("stream", "", "Stream name")
|
||||
kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint")
|
||||
awsRegion = flag.String("region", "us-west-2", "AWS Region")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
// open dummy user data
|
||||
|
|
@ -26,11 +35,17 @@ func main() {
|
|||
|
||||
var records []*kinesis.PutRecordsRequestEntry
|
||||
|
||||
sess, err := session.NewSession(aws.NewConfig())
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
cfg := aws.NewConfig().
|
||||
WithEndpoint(*kinesisEndpoint).
|
||||
WithRegion(*awsRegion).
|
||||
WithLogLevel(3)
|
||||
|
||||
var client = kinesis.New(session.Must(session.NewSession(cfg)))
|
||||
|
||||
// create stream if doesn't exist
|
||||
if err := createStream(client, streamName); err != nil {
|
||||
log.Fatalf("create stream error: %v", err)
|
||||
}
|
||||
var client = kinesis.New(sess)
|
||||
|
||||
// loop over file data
|
||||
b := bufio.NewScanner(f)
|
||||
|
|
@ -51,6 +66,28 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
func createStream(client *kinesis.Kinesis, streamName *string) error {
|
||||
resp, err := client.ListStreams(&kinesis.ListStreamsInput{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("list streams error: %v", err)
|
||||
}
|
||||
|
||||
for _, val := range resp.StreamNames {
|
||||
if *streamName == *val {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
_, err = client.CreateStream(
|
||||
&kinesis.CreateStreamInput{
|
||||
StreamName: streamName,
|
||||
ShardCount: aws.Int64(2),
|
||||
},
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func putRecords(client *kinesis.Kinesis, streamName *string, records []*kinesis.PutRecordsRequestEntry) {
|
||||
_, err := client.PutRecords(&kinesis.PutRecordsInput{
|
||||
StreamName: streamName,
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -1,7 +1,7 @@
|
|||
module github.com/harlow/kinesis-consumer
|
||||
|
||||
require (
|
||||
github.com/DATA-DOG/go-sqlmock v1.3.3 // indirect
|
||||
github.com/DATA-DOG/go-sqlmock v1.3.3
|
||||
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect
|
||||
github.com/alicebob/miniredis v2.5.0+incompatible
|
||||
github.com/apex/log v1.0.0
|
||||
|
|
|
|||
1
go.sum
1
go.sum
|
|
@ -1,3 +1,4 @@
|
|||
github.com/DATA-DOG/go-sqlmock v1.3.3 h1:CWUqKXe0s8A2z6qCgkP4Kru7wC11YoAnoupUKFDnH08=
|
||||
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
|
||||
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMwWcqkLzDAQugVEwedisr5nRJ1r+7LYnv0U=
|
||||
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
|
||||
|
|
|
|||
Loading…
Reference in a new issue