Compare commits
14 commits
master
...
opentracin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
271f32670b | ||
|
|
1a14621969 | ||
|
|
63829e7e07 | ||
|
|
91787fac0e | ||
|
|
5ff0624a2d | ||
|
|
a6eaeb2bfa | ||
|
|
0528067bbe | ||
|
|
831efcece3 | ||
|
|
43cf094b27 | ||
|
|
37fc2cd212 | ||
|
|
b04d4a4670 | ||
|
|
4c2864ef56 | ||
|
|
cba40aeeb8 | ||
|
|
a800065101 |
19 changed files with 853 additions and 111 deletions
118
Gopkg.lock
generated
118
Gopkg.lock
generated
|
|
@ -2,15 +2,18 @@
|
||||||
|
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
|
digest = "1:5bbabe0c3c7e7f524b4c38193b80bf24624e67c0f3a036c4244c85c9a80579fd"
|
||||||
name = "github.com/apex/log"
|
name = "github.com/apex/log"
|
||||||
packages = [
|
packages = [
|
||||||
".",
|
".",
|
||||||
"handlers/text"
|
"handlers/text",
|
||||||
]
|
]
|
||||||
|
pruneopts = "UT"
|
||||||
revision = "0296d6eb16bb28f8a0c55668affcf4876dc269be"
|
revision = "0296d6eb16bb28f8a0c55668affcf4876dc269be"
|
||||||
version = "v1.0.0"
|
version = "v1.0.0"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
|
digest = "1:430a0049ba9e5652a778f1bb2a755b456ef8de588d94093f0b02a63cb885fbca"
|
||||||
name = "github.com/aws/aws-sdk-go"
|
name = "github.com/aws/aws-sdk-go"
|
||||||
packages = [
|
packages = [
|
||||||
"aws",
|
"aws",
|
||||||
|
|
@ -46,55 +49,118 @@
|
||||||
"service/dynamodb/dynamodbiface",
|
"service/dynamodb/dynamodbiface",
|
||||||
"service/kinesis",
|
"service/kinesis",
|
||||||
"service/kinesis/kinesisiface",
|
"service/kinesis/kinesisiface",
|
||||||
"service/sts"
|
"service/sts",
|
||||||
]
|
]
|
||||||
|
pruneopts = "UT"
|
||||||
revision = "8475c414b1bd58b8cc214873a8854e3a621e67d7"
|
revision = "8475c414b1bd58b8cc214873a8854e3a621e67d7"
|
||||||
version = "v1.15.0"
|
version = "v1.15.0"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
|
branch = "master"
|
||||||
|
digest = "1:4c4c33075b704791d6a7f09dfb55c66769e8a1dc6adf87026292d274fe8ad113"
|
||||||
|
name = "github.com/codahale/hdrhistogram"
|
||||||
|
packages = ["."]
|
||||||
|
pruneopts = "UT"
|
||||||
|
revision = "3a0bb77429bd3a61596f5e8a3172445844342120"
|
||||||
|
|
||||||
|
[[projects]]
|
||||||
|
digest = "1:fe8a03a8222d5b913f256972933d26d24ad7c8286692a42943bc01633cc8fce3"
|
||||||
name = "github.com/go-ini/ini"
|
name = "github.com/go-ini/ini"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
|
pruneopts = "UT"
|
||||||
revision = "358ee7663966325963d4e8b2e1fbd570c5195153"
|
revision = "358ee7663966325963d4e8b2e1fbd570c5195153"
|
||||||
version = "v1.38.1"
|
version = "v1.38.1"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
name = "github.com/harlow/kinesis-consumer"
|
digest = "1:e22af8c7518e1eab6f2eab2b7d7558927f816262586cd6ed9f349c97a6c285c4"
|
||||||
packages = [
|
|
||||||
".",
|
|
||||||
"checkpoint/ddb",
|
|
||||||
"checkpoint/postgres",
|
|
||||||
"checkpoint/redis"
|
|
||||||
]
|
|
||||||
revision = "049445e259a2ab9146364bf60d6f5f71270a125b"
|
|
||||||
version = "v0.2.0"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "github.com/jmespath/go-jmespath"
|
name = "github.com/jmespath/go-jmespath"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
|
pruneopts = "UT"
|
||||||
revision = "0b12d6b5"
|
revision = "0b12d6b5"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
|
digest = "1:37ce7d7d80531b227023331002c0d42b4b4b291a96798c82a049d03a54ba79e4"
|
||||||
name = "github.com/lib/pq"
|
name = "github.com/lib/pq"
|
||||||
packages = [
|
packages = [
|
||||||
".",
|
".",
|
||||||
"oid"
|
"oid",
|
||||||
]
|
]
|
||||||
|
pruneopts = "UT"
|
||||||
revision = "90697d60dd844d5ef6ff15135d0203f65d2f53b8"
|
revision = "90697d60dd844d5ef6ff15135d0203f65d2f53b8"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
|
digest = "1:450b7623b185031f3a456801155c8320209f75d0d4c4e633c6b1e59d44d6e392"
|
||||||
|
name = "github.com/opentracing/opentracing-go"
|
||||||
|
packages = [
|
||||||
|
".",
|
||||||
|
"ext",
|
||||||
|
"log",
|
||||||
|
]
|
||||||
|
pruneopts = "UT"
|
||||||
|
revision = "1949ddbfd147afd4d964a9f00b24eb291e0e7c38"
|
||||||
|
version = "v1.0.2"
|
||||||
|
|
||||||
|
[[projects]]
|
||||||
|
digest = "1:40e195917a951a8bf867cd05de2a46aaf1806c50cf92eebf4c16f78cd196f747"
|
||||||
name = "github.com/pkg/errors"
|
name = "github.com/pkg/errors"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
|
pruneopts = "UT"
|
||||||
revision = "645ef00459ed84a119197bfb8d8205042c6df63d"
|
revision = "645ef00459ed84a119197bfb8d8205042c6df63d"
|
||||||
version = "v0.8.0"
|
version = "v0.8.0"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
|
digest = "1:ac6f26e917fd2fb3194a7ebe2baf6fb32de2f2fbfed130c18aac0e758a6e1d22"
|
||||||
|
name = "github.com/uber/jaeger-client-go"
|
||||||
|
packages = [
|
||||||
|
".",
|
||||||
|
"config",
|
||||||
|
"internal/baggage",
|
||||||
|
"internal/baggage/remote",
|
||||||
|
"internal/spanlog",
|
||||||
|
"internal/throttler",
|
||||||
|
"internal/throttler/remote",
|
||||||
|
"log",
|
||||||
|
"rpcmetrics",
|
||||||
|
"thrift",
|
||||||
|
"thrift-gen/agent",
|
||||||
|
"thrift-gen/baggage",
|
||||||
|
"thrift-gen/jaeger",
|
||||||
|
"thrift-gen/sampling",
|
||||||
|
"thrift-gen/zipkincore",
|
||||||
|
"transport",
|
||||||
|
"utils",
|
||||||
|
]
|
||||||
|
pruneopts = "UT"
|
||||||
|
revision = "1a782e2da844727691fef1757c72eb190c2909f0"
|
||||||
|
version = "v2.15.0"
|
||||||
|
|
||||||
|
[[projects]]
|
||||||
|
digest = "1:0f09db8429e19d57c8346ad76fbbc679341fa86073d3b8fb5ac919f0357d8f4c"
|
||||||
|
name = "github.com/uber/jaeger-lib"
|
||||||
|
packages = ["metrics"]
|
||||||
|
pruneopts = "UT"
|
||||||
|
revision = "ed3a127ec5fef7ae9ea95b01b542c47fbd999ce5"
|
||||||
|
version = "v1.5.0"
|
||||||
|
|
||||||
|
[[projects]]
|
||||||
|
branch = "master"
|
||||||
|
digest = "1:76ee51c3f468493aff39dbacc401e8831fbb765104cbf613b89bef01cf4bad70"
|
||||||
|
name = "golang.org/x/net"
|
||||||
|
packages = ["context"]
|
||||||
|
pruneopts = "UT"
|
||||||
|
revision = "a544f70c90f196e50d198126db0c4cb2b562fec0"
|
||||||
|
|
||||||
|
[[projects]]
|
||||||
|
digest = "1:04aea75705cb453e24bf8c1506a24a5a9036537dbc61ddf71d20900d6c7c3ab9"
|
||||||
name = "gopkg.in/DATA-DOG/go-sqlmock.v1"
|
name = "gopkg.in/DATA-DOG/go-sqlmock.v1"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
|
pruneopts = "UT"
|
||||||
revision = "d76b18b42f285b792bf985118980ce9eacea9d10"
|
revision = "d76b18b42f285b792bf985118980ce9eacea9d10"
|
||||||
version = "v1.3.0"
|
version = "v1.3.0"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
|
digest = "1:e5a1379b4f0cad2aabd75580598c3b8e19a027e8eed806e7b76b0ec949df4599"
|
||||||
name = "gopkg.in/redis.v5"
|
name = "gopkg.in/redis.v5"
|
||||||
packages = [
|
packages = [
|
||||||
".",
|
".",
|
||||||
|
|
@ -102,14 +168,34 @@
|
||||||
"internal/consistenthash",
|
"internal/consistenthash",
|
||||||
"internal/hashtag",
|
"internal/hashtag",
|
||||||
"internal/pool",
|
"internal/pool",
|
||||||
"internal/proto"
|
"internal/proto",
|
||||||
]
|
]
|
||||||
|
pruneopts = "UT"
|
||||||
revision = "a16aeec10ff407b1e7be6dd35797ccf5426ef0f0"
|
revision = "a16aeec10ff407b1e7be6dd35797ccf5426ef0f0"
|
||||||
version = "v5.2.9"
|
version = "v5.2.9"
|
||||||
|
|
||||||
[solve-meta]
|
[solve-meta]
|
||||||
analyzer-name = "dep"
|
analyzer-name = "dep"
|
||||||
analyzer-version = 1
|
analyzer-version = 1
|
||||||
inputs-digest = "2588ee54549a76e93e2e65a289fccd8b636f85b124c5ccb0ab3d5f3529a3cbaa"
|
input-imports = [
|
||||||
|
"github.com/apex/log",
|
||||||
|
"github.com/apex/log/handlers/text",
|
||||||
|
"github.com/aws/aws-sdk-go/aws",
|
||||||
|
"github.com/aws/aws-sdk-go/aws/awserr",
|
||||||
|
"github.com/aws/aws-sdk-go/aws/request",
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session",
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb",
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute",
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface",
|
||||||
|
"github.com/aws/aws-sdk-go/service/kinesis",
|
||||||
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface",
|
||||||
|
"github.com/lib/pq",
|
||||||
|
"github.com/opentracing/opentracing-go",
|
||||||
|
"github.com/opentracing/opentracing-go/ext",
|
||||||
|
"github.com/pkg/errors",
|
||||||
|
"github.com/uber/jaeger-client-go/config",
|
||||||
|
"gopkg.in/DATA-DOG/go-sqlmock.v1",
|
||||||
|
"gopkg.in/redis.v5",
|
||||||
|
]
|
||||||
solver-name = "gps-cdcl"
|
solver-name = "gps-cdcl"
|
||||||
solver-version = 1
|
solver-version = 1
|
||||||
|
|
|
||||||
|
|
@ -258,6 +258,9 @@ func main() {
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Opentracing
|
||||||
|
To enable integraton with Opentracing. Checkpoint, Consumer are now required to pass in context as first parameter. Context object wraps tracing context within and is required to pass down to other layer. Another change, that should be invisible from user is that, all AWS SDK GO call are now using the version WithContext, e.g. if codebase is using GetID(...), now they are replaced with GetIDWithContext(ctx,...). This is done so we can link the span created for AWS call to spans created upstream within application code.
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]!
|
Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]!
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,17 @@
|
||||||
package consumer
|
package consumer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
// Checkpoint interface used track consumer progress in the stream
|
// Checkpoint interface used track consumer progress in the stream
|
||||||
type Checkpoint interface {
|
type Checkpoint interface {
|
||||||
Get(streamName, shardID string) (string, error)
|
Get(ctx context.Context, streamName, shardID string) (string, error)
|
||||||
Set(streamName, shardID, sequenceNumber string) error
|
Set(ctx context.Context, streamName, shardID, sequenceNumber string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// noopCheckpoint implements the checkpoint interface with discard
|
// noopCheckpoint implements the checkpoint interface with discard
|
||||||
type noopCheckpoint struct{}
|
type noopCheckpoint struct{}
|
||||||
|
|
||||||
func (n noopCheckpoint) Set(string, string, string) error { return nil }
|
func (n noopCheckpoint) Set(context.Context, string, string, string) error { return nil }
|
||||||
func (n noopCheckpoint) Get(string, string) (string, error) { return "", nil }
|
func (n noopCheckpoint) Get(context.Context, string, string) (string, error) { return "", nil }
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package ddb
|
package ddb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
@ -11,6 +12,8 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||||
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
|
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
|
||||||
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
|
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
|
||||||
|
"github.com/opentracing/opentracing-go"
|
||||||
|
"github.com/opentracing/opentracing-go/ext"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option is used to override defaults when creating a new Checkpoint
|
// Option is used to override defaults when creating a new Checkpoint
|
||||||
|
|
@ -38,7 +41,7 @@ func WithRetryer(r Retryer) Option {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a checkpoint that uses DynamoDB for underlying storage
|
// New returns a checkpoint that uses DynamoDB for underlying storage
|
||||||
func New(appName, tableName string, opts ...Option) (*Checkpoint, error) {
|
func New(ctx context.Context, appName, tableName string, opts ...Option) (*Checkpoint, error) {
|
||||||
client := dynamodb.New(session.New(aws.NewConfig()))
|
client := dynamodb.New(session.New(aws.NewConfig()))
|
||||||
|
|
||||||
ck := &Checkpoint{
|
ck := &Checkpoint{
|
||||||
|
|
@ -56,7 +59,7 @@ func New(appName, tableName string, opts ...Option) (*Checkpoint, error) {
|
||||||
opt(ck)
|
opt(ck)
|
||||||
}
|
}
|
||||||
|
|
||||||
go ck.loop()
|
go ck.loop(ctx)
|
||||||
|
|
||||||
return ck, nil
|
return ck, nil
|
||||||
}
|
}
|
||||||
|
|
@ -87,9 +90,15 @@ type item struct {
|
||||||
// Get determines if a checkpoint for a particular Shard exists.
|
// Get determines if a checkpoint for a particular Shard exists.
|
||||||
// Typically used to determine whether we should start processing the shard with
|
// Typically used to determine whether we should start processing the shard with
|
||||||
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
||||||
func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
func (c *Checkpoint) Get(ctx context.Context, streamName, shardID string) (string, error) {
|
||||||
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "checkpoint.ddb.Get",
|
||||||
|
opentracing.Tag{Key: "appName", Value: c.appName},
|
||||||
|
opentracing.Tag{Key: "tableName", Value: c.tableName},
|
||||||
|
opentracing.Tag{Key: "namespace", Value: namespace},
|
||||||
|
opentracing.Tag{Key: "shardID", Value: shardID},
|
||||||
|
)
|
||||||
|
defer span.Finish()
|
||||||
params := &dynamodb.GetItemInput{
|
params := &dynamodb.GetItemInput{
|
||||||
TableName: aws.String(c.tableName),
|
TableName: aws.String(c.tableName),
|
||||||
ConsistentRead: aws.Bool(true),
|
ConsistentRead: aws.Bool(true),
|
||||||
|
|
@ -103,11 +112,13 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.client.GetItem(params)
|
resp, err := c.client.GetItemWithContext(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if c.retryer.ShouldRetry(err) {
|
if c.retryer.ShouldRetry(err) {
|
||||||
return c.Get(streamName, shardID)
|
return c.Get(ctx, streamName, shardID)
|
||||||
}
|
}
|
||||||
|
span.LogKV("checkpoint get item error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -118,10 +129,16 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
||||||
|
|
||||||
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||||
// Upon failover, record processing is resumed from this point.
|
// Upon failover, record processing is resumed from this point.
|
||||||
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
|
func (c *Checkpoint) Set(ctx context.Context, streamName, shardID, sequenceNumber string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "checkpoint.ddb.Set",
|
||||||
|
opentracing.Tag{Key: "appName", Value: c.appName},
|
||||||
|
opentracing.Tag{Key: "tableName", Value: c.tableName},
|
||||||
|
opentracing.Tag{Key: "stream.name", Value: streamName},
|
||||||
|
opentracing.Tag{Key: "shardID", Value: shardID},
|
||||||
|
)
|
||||||
|
defer span.Finish()
|
||||||
if sequenceNumber == "" {
|
if sequenceNumber == "" {
|
||||||
return fmt.Errorf("sequence number should not be empty")
|
return fmt.Errorf("sequence number should not be empty")
|
||||||
}
|
}
|
||||||
|
|
@ -136,12 +153,12 @@ func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown the checkpoint. Save any in-flight data.
|
// Shutdown the checkpoint. Save any in-flight data.
|
||||||
func (c *Checkpoint) Shutdown() error {
|
func (c *Checkpoint) Shutdown(ctx context.Context) error {
|
||||||
c.done <- struct{}{}
|
c.done <- struct{}{}
|
||||||
return c.save()
|
return c.save(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Checkpoint) loop() {
|
func (c *Checkpoint) loop(ctx context.Context) {
|
||||||
tick := time.NewTicker(c.maxInterval)
|
tick := time.NewTicker(c.maxInterval)
|
||||||
defer tick.Stop()
|
defer tick.Stop()
|
||||||
defer close(c.done)
|
defer close(c.done)
|
||||||
|
|
@ -149,16 +166,20 @@ func (c *Checkpoint) loop() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
c.save()
|
c.save(ctx)
|
||||||
case <-c.done:
|
case <-c.done:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Checkpoint) save() error {
|
func (c *Checkpoint) save(ctx context.Context) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "checkpoint.ddb.save")
|
||||||
|
defer span.Finish()
|
||||||
|
span = span.SetTag("appName", c.appName)
|
||||||
|
span = span.SetTag("tableName", c.tableName)
|
||||||
|
|
||||||
for key, sequenceNumber := range c.checkpoints {
|
for key, sequenceNumber := range c.checkpoints {
|
||||||
item, err := dynamodbattribute.MarshalMap(item{
|
item, err := dynamodbattribute.MarshalMap(item{
|
||||||
|
|
@ -168,10 +189,12 @@ func (c *Checkpoint) save() error {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("marshal map error: %v", err)
|
log.Printf("marshal map error: %v", err)
|
||||||
|
span.LogKV("marshal map error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = c.client.PutItem(&dynamodb.PutItemInput{
|
_, err = c.client.PutItemWithContext(ctx, &dynamodb.PutItemInput{
|
||||||
TableName: aws.String(c.tableName),
|
TableName: aws.String(c.tableName),
|
||||||
Item: item,
|
Item: item,
|
||||||
})
|
})
|
||||||
|
|
@ -179,7 +202,9 @@ func (c *Checkpoint) save() error {
|
||||||
if !c.retryer.ShouldRetry(err) {
|
if !c.retryer.ShouldRetry(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return c.save()
|
span.LogKV("checkpoint put item error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
return c.save(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package postgres
|
package postgres
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -80,7 +81,7 @@ func (c *Checkpoint) GetMaxInterval() time.Duration {
|
||||||
// Get determines if a checkpoint for a particular Shard exists.
|
// Get determines if a checkpoint for a particular Shard exists.
|
||||||
// Typically used to determine whether we should start processing the shard with
|
// Typically used to determine whether we should start processing the shard with
|
||||||
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
||||||
func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
func (c *Checkpoint) Get(ctx context.Context, streamName, shardID string) (string, error) {
|
||||||
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
||||||
|
|
||||||
var sequenceNumber string
|
var sequenceNumber string
|
||||||
|
|
@ -99,7 +100,7 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
||||||
|
|
||||||
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||||
// Upon failover, record processing is resumed from this point.
|
// Upon failover, record processing is resumed from this point.
|
||||||
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
|
func (c *Checkpoint) Set(ctx context.Context, streamName, shardID, sequenceNumber string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,16 @@
|
||||||
package postgres_test
|
package postgres_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"fmt"
|
"gopkg.in/DATA-DOG/go-sqlmock.v1"
|
||||||
|
|
||||||
"database/sql"
|
|
||||||
|
|
||||||
"github.com/harlow/kinesis-consumer/checkpoint/postgres"
|
"github.com/harlow/kinesis-consumer/checkpoint/postgres"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"gopkg.in/DATA-DOG/go-sqlmock.v1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNew(t *testing.T) {
|
func TestNew(t *testing.T) {
|
||||||
|
|
@ -77,6 +76,7 @@ func TestNew_WithMaxIntervalOption(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckpoint_Get(t *testing.T) {
|
func TestCheckpoint_Get(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
appName := "streamConsumer"
|
appName := "streamConsumer"
|
||||||
tableName := "checkpoint"
|
tableName := "checkpoint"
|
||||||
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
||||||
|
|
@ -102,7 +102,7 @@ func TestCheckpoint_Get(t *testing.T) {
|
||||||
tableName)
|
tableName)
|
||||||
mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnRows(expectedRows)
|
mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnRows(expectedRows)
|
||||||
|
|
||||||
gotSequenceNumber, err := ck.Get(streamName, shardID)
|
gotSequenceNumber, err := ck.Get(ctx, streamName, shardID)
|
||||||
|
|
||||||
if gotSequenceNumber != expectedSequenceNumber {
|
if gotSequenceNumber != expectedSequenceNumber {
|
||||||
t.Errorf("expected sequence number equals %v, but got %v", expectedSequenceNumber, gotSequenceNumber)
|
t.Errorf("expected sequence number equals %v, but got %v", expectedSequenceNumber, gotSequenceNumber)
|
||||||
|
|
@ -117,6 +117,7 @@ func TestCheckpoint_Get(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckpoint_Get_NoRows(t *testing.T) {
|
func TestCheckpoint_Get_NoRows(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
appName := "streamConsumer"
|
appName := "streamConsumer"
|
||||||
tableName := "checkpoint"
|
tableName := "checkpoint"
|
||||||
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
||||||
|
|
@ -138,7 +139,7 @@ func TestCheckpoint_Get_NoRows(t *testing.T) {
|
||||||
tableName)
|
tableName)
|
||||||
mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnError(sql.ErrNoRows)
|
mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnError(sql.ErrNoRows)
|
||||||
|
|
||||||
gotSequenceNumber, err := ck.Get(streamName, shardID)
|
gotSequenceNumber, err := ck.Get(ctx, streamName, shardID)
|
||||||
|
|
||||||
if gotSequenceNumber != "" {
|
if gotSequenceNumber != "" {
|
||||||
t.Errorf("expected sequence number equals empty, but got %v", gotSequenceNumber)
|
t.Errorf("expected sequence number equals empty, but got %v", gotSequenceNumber)
|
||||||
|
|
@ -153,6 +154,7 @@ func TestCheckpoint_Get_NoRows(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckpoint_Get_QueryError(t *testing.T) {
|
func TestCheckpoint_Get_QueryError(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
appName := "streamConsumer"
|
appName := "streamConsumer"
|
||||||
tableName := "checkpoint"
|
tableName := "checkpoint"
|
||||||
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
||||||
|
|
@ -174,7 +176,7 @@ func TestCheckpoint_Get_QueryError(t *testing.T) {
|
||||||
tableName)
|
tableName)
|
||||||
mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnError(errors.New("an error"))
|
mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnError(errors.New("an error"))
|
||||||
|
|
||||||
gotSequenceNumber, err := ck.Get(streamName, shardID)
|
gotSequenceNumber, err := ck.Get(ctx, streamName, shardID)
|
||||||
|
|
||||||
if gotSequenceNumber != "" {
|
if gotSequenceNumber != "" {
|
||||||
t.Errorf("expected sequence number equals empty, but got %v", gotSequenceNumber)
|
t.Errorf("expected sequence number equals empty, but got %v", gotSequenceNumber)
|
||||||
|
|
@ -189,6 +191,7 @@ func TestCheckpoint_Get_QueryError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckpoint_Set(t *testing.T) {
|
func TestCheckpoint_Set(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
appName := "streamConsumer"
|
appName := "streamConsumer"
|
||||||
tableName := "checkpoint"
|
tableName := "checkpoint"
|
||||||
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
||||||
|
|
@ -201,7 +204,7 @@ func TestCheckpoint_Set(t *testing.T) {
|
||||||
t.Fatalf("error occurred during the checkpoint creation. cause: %v", err)
|
t.Fatalf("error occurred during the checkpoint creation. cause: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ck.Set(streamName, shardID, expectedSequenceNumber)
|
err = ck.Set(ctx, streamName, shardID, expectedSequenceNumber)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("expected error equals nil, but got %v", err)
|
t.Errorf("expected error equals nil, but got %v", err)
|
||||||
|
|
@ -210,6 +213,7 @@ func TestCheckpoint_Set(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckpoint_Set_SequenceNumberEmpty(t *testing.T) {
|
func TestCheckpoint_Set_SequenceNumberEmpty(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
appName := "streamConsumer"
|
appName := "streamConsumer"
|
||||||
tableName := "checkpoint"
|
tableName := "checkpoint"
|
||||||
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
||||||
|
|
@ -222,7 +226,7 @@ func TestCheckpoint_Set_SequenceNumberEmpty(t *testing.T) {
|
||||||
t.Fatalf("error occurred during the checkpoint creation. cause: %v", err)
|
t.Fatalf("error occurred during the checkpoint creation. cause: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ck.Set(streamName, shardID, expectedSequenceNumber)
|
err = ck.Set(ctx, streamName, shardID, expectedSequenceNumber)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("expected error equals not nil, but got %v", err)
|
t.Errorf("expected error equals not nil, but got %v", err)
|
||||||
|
|
@ -231,6 +235,7 @@ func TestCheckpoint_Set_SequenceNumberEmpty(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckpoint_Shutdown(t *testing.T) {
|
func TestCheckpoint_Shutdown(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
appName := "streamConsumer"
|
appName := "streamConsumer"
|
||||||
tableName := "checkpoint"
|
tableName := "checkpoint"
|
||||||
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
||||||
|
|
@ -253,7 +258,7 @@ func TestCheckpoint_Shutdown(t *testing.T) {
|
||||||
result := sqlmock.NewResult(0, 1)
|
result := sqlmock.NewResult(0, 1)
|
||||||
mock.ExpectExec(expectedSQLRegexString).WithArgs(namespace, shardID, expectedSequenceNumber).WillReturnResult(result)
|
mock.ExpectExec(expectedSQLRegexString).WithArgs(namespace, shardID, expectedSequenceNumber).WillReturnResult(result)
|
||||||
|
|
||||||
err = ck.Set(streamName, shardID, expectedSequenceNumber)
|
err = ck.Set(ctx, streamName, shardID, expectedSequenceNumber)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to set checkpoint for data initialization. cause: %v", err)
|
t.Fatalf("unable to set checkpoint for data initialization. cause: %v", err)
|
||||||
|
|
@ -270,6 +275,7 @@ func TestCheckpoint_Shutdown(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckpoint_Shutdown_SaveError(t *testing.T) {
|
func TestCheckpoint_Shutdown_SaveError(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
appName := "streamConsumer"
|
appName := "streamConsumer"
|
||||||
tableName := "checkpoint"
|
tableName := "checkpoint"
|
||||||
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;"
|
||||||
|
|
@ -291,7 +297,7 @@ func TestCheckpoint_Shutdown_SaveError(t *testing.T) {
|
||||||
expectedSQLRegexString := fmt.Sprintf(`INSERT INTO %s \(namespace, shard_id, sequence_number\) VALUES\(\$1, \$2, \$3\) ON CONFLICT \(namespace, shard_id\) DO UPDATE SET sequence_number= \$3;`, tableName)
|
expectedSQLRegexString := fmt.Sprintf(`INSERT INTO %s \(namespace, shard_id, sequence_number\) VALUES\(\$1, \$2, \$3\) ON CONFLICT \(namespace, shard_id\) DO UPDATE SET sequence_number= \$3;`, tableName)
|
||||||
mock.ExpectExec(expectedSQLRegexString).WithArgs(namespace, shardID, expectedSequenceNumber).WillReturnError(errors.New("an error"))
|
mock.ExpectExec(expectedSQLRegexString).WithArgs(namespace, shardID, expectedSequenceNumber).WillReturnError(errors.New("an error"))
|
||||||
|
|
||||||
err = ck.Set(streamName, shardID, expectedSequenceNumber)
|
err = ck.Set(ctx, streamName, shardID, expectedSequenceNumber)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to set checkpoint for data initialization. cause: %v", err)
|
t.Fatalf("unable to set checkpoint for data initialization. cause: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package redis
|
package redis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
|
@ -37,14 +38,14 @@ type Checkpoint struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get fetches the checkpoint for a particular Shard.
|
// Get fetches the checkpoint for a particular Shard.
|
||||||
func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
func (c *Checkpoint) Get(ctx context.Context, streamName, shardID string) (string, error) {
|
||||||
val, _ := c.client.Get(c.key(streamName, shardID)).Result()
|
val, _ := c.client.Get(c.key(streamName, shardID)).Result()
|
||||||
return val, nil
|
return val, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||||
// Upon failover, record processing is resumed from this point.
|
// Upon failover, record processing is resumed from this point.
|
||||||
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
|
func (c *Checkpoint) Set(ctx context.Context, streamName, shardID, sequenceNumber string) error {
|
||||||
if sequenceNumber == "" {
|
if sequenceNumber == "" {
|
||||||
return fmt.Errorf("sequence number should not be empty")
|
return fmt.Errorf("sequence number should not be empty")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,23 @@
|
||||||
package redis
|
package redis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_CheckpointLifecycle(t *testing.T) {
|
func Test_CheckpointLifecycle(t *testing.T) {
|
||||||
// new
|
// new
|
||||||
|
ctx := context.TODO()
|
||||||
c, err := New("app")
|
c, err := New("app")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("new checkpoint error: %v", err)
|
t.Fatalf("new checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// set
|
// set
|
||||||
c.Set("streamName", "shardID", "testSeqNum")
|
c.Set(ctx, "streamName", "shardID", "testSeqNum")
|
||||||
|
|
||||||
// get
|
// get
|
||||||
val, err := c.Get("streamName", "shardID")
|
val, err := c.Get(ctx, "streamName", "shardID")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("get checkpoint error: %v", err)
|
t.Fatalf("get checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -25,12 +27,13 @@ func Test_CheckpointLifecycle(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_SetEmptySeqNum(t *testing.T) {
|
func Test_SetEmptySeqNum(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
c, err := New("app")
|
c, err := New("app")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("new checkpoint error: %v", err)
|
t.Fatalf("new checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.Set("streamName", "shardID", "")
|
err = c.Set(ctx, "streamName", "shardID", "")
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("should not allow empty sequence number")
|
t.Fatalf("should not allow empty sequence number")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
98
consumer.go
98
consumer.go
|
|
@ -11,6 +11,9 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||||
|
|
||||||
|
opentracing "github.com/opentracing/opentracing-go"
|
||||||
|
"github.com/opentracing/opentracing-go/ext"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Record is an alias of record returned from kinesis library
|
// Record is an alias of record returned from kinesis library
|
||||||
|
|
@ -100,17 +103,26 @@ type Consumer struct {
|
||||||
|
|
||||||
// Scan scans each of the shards of the stream, calls the callback
|
// Scan scans each of the shards of the stream, calls the callback
|
||||||
// func with each of the kinesis records.
|
// func with each of the kinesis records.
|
||||||
func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error {
|
func (c *Consumer) Scan(ctx context.Context, fn func(context.Context, *Record) ScanStatus) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.scan")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
// get shard ids
|
// get shard ids
|
||||||
shardIDs, err := c.getShardIDs(c.streamName)
|
shardIDs, err := c.getShardIDs(ctx, c.streamName)
|
||||||
|
span.SetTag("stream.name", c.streamName)
|
||||||
|
span.SetTag("shard.count", len(shardIDs))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get shards error: %v", err)
|
span.LogKV("get shardID error", err.Error(), "stream.name", c.streamName)
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
return fmt.Errorf("get shards error: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(shardIDs) == 0 {
|
if len(shardIDs) == 0 {
|
||||||
|
span.LogKV("stream.name", c.streamName, "shards.count", len(shardIDs))
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return fmt.Errorf("no shards available")
|
return fmt.Errorf("no shards available")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -126,6 +138,9 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
if err := c.ScanShard(ctx, shardID, fn); err != nil {
|
if err := c.ScanShard(ctx, shardID, fn); err != nil {
|
||||||
|
span.LogKV("scan shard error", err.Error(), "shardID", shardID)
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
span.Finish()
|
||||||
select {
|
select {
|
||||||
case errc <- fmt.Errorf("shard %s error: %v", shardID, err):
|
case errc <- fmt.Errorf("shard %s error: %v", shardID, err):
|
||||||
// first error to occur
|
// first error to occur
|
||||||
|
|
@ -149,38 +164,49 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error
|
||||||
func (c *Consumer) ScanShard(
|
func (c *Consumer) ScanShard(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
shardID string,
|
shardID string,
|
||||||
fn func(*Record) ScanStatus,
|
fn func(context.Context, *Record) ScanStatus,
|
||||||
) error {
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.scanshard")
|
||||||
|
defer span.Finish()
|
||||||
// get checkpoint
|
// get checkpoint
|
||||||
lastSeqNum, err := c.checkpoint.Get(c.streamName, shardID)
|
lastSeqNum, err := c.checkpoint.Get(ctx, c.streamName, shardID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.LogKV("checkpoint error", err.Error(), "shardID", shardID)
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return fmt.Errorf("get checkpoint error: %v", err)
|
return fmt.Errorf("get checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// get shard iterator
|
// get shard iterator
|
||||||
shardIterator, err := c.getShardIterator(c.streamName, shardID, lastSeqNum)
|
shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.LogKV("get shard error", err.Error(), "shardID", shardID, "lastSeqNumber", lastSeqNum)
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return fmt.Errorf("get shard iterator error: %v", err)
|
return fmt.Errorf("get shard iterator error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Log("scanning", shardID, lastSeqNum)
|
c.logger.Log(fmt.Sprintf("scanning shardID %s lastSeqNum %s", shardID, lastSeqNum))
|
||||||
|
|
||||||
return c.scanPagesOfShard(ctx, shardID, lastSeqNum, shardIterator, fn)
|
return c.scanPagesOfShard(ctx, shardID, lastSeqNum, shardIterator, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Consumer) scanPagesOfShard(ctx context.Context, shardID, lastSeqNum string, shardIterator *string, fn func(*Record) ScanStatus) error {
|
func (c *Consumer) scanPagesOfShard(ctx context.Context, shardID, lastSeqNum string, shardIterator *string, fn func(context.Context, *Record) ScanStatus) error {
|
||||||
|
span := opentracing.SpanFromContext(ctx)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
span.SetTag("scan", "done")
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{
|
span.SetTag("scan", "on")
|
||||||
|
resp, err := c.client.GetRecordsWithContext(ctx, &kinesis.GetRecordsInput{
|
||||||
ShardIterator: shardIterator,
|
ShardIterator: shardIterator,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shardIterator, err = c.getShardIterator(c.streamName, shardID, lastSeqNum)
|
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
span.LogKV("get shard iterator error", err.Error())
|
||||||
return fmt.Errorf("get shard iterator error: %v", err)
|
return fmt.Errorf("get shard iterator error: %v", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
@ -188,17 +214,22 @@ func (c *Consumer) scanPagesOfShard(ctx context.Context, shardID, lastSeqNum str
|
||||||
|
|
||||||
// loop records of page
|
// loop records of page
|
||||||
for _, r := range resp.Records {
|
for _, r := range resp.Records {
|
||||||
isScanStopped, err := c.handleRecord(shardID, r, fn)
|
ctx = opentracing.ContextWithSpan(ctx, span)
|
||||||
|
isScanStopped, err := c.handleRecord(ctx, shardID, r, fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.LogKV("handle record error", err.Error(), "shardID", shardID)
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if isScanStopped {
|
if isScanStopped {
|
||||||
|
span.SetTag("scan", "stopped")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
lastSeqNum = *r.SequenceNumber
|
lastSeqNum = *r.SequenceNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
if isShardClosed(resp.NextShardIterator, shardIterator) {
|
if isShardClosed(resp.NextShardIterator, shardIterator) {
|
||||||
|
span.LogKV("is shard closed", "true")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
shardIterator = resp.NextShardIterator
|
shardIterator = resp.NextShardIterator
|
||||||
|
|
@ -210,34 +241,47 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
|
||||||
return nextShardIterator == nil || currentShardIterator == nextShardIterator
|
return nextShardIterator == nil || currentShardIterator == nextShardIterator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Consumer) handleRecord(shardID string, r *Record, fn func(*Record) ScanStatus) (isScanStopped bool, err error) {
|
func (c *Consumer) handleRecord(ctx context.Context, shardID string, r *Record, fn func(context.Context, *Record) ScanStatus) (isScanStopped bool, err error) {
|
||||||
status := fn(r)
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.handleRecord")
|
||||||
|
defer span.Finish()
|
||||||
|
status := fn(ctx, r)
|
||||||
if !status.SkipCheckpoint {
|
if !status.SkipCheckpoint {
|
||||||
if err := c.checkpoint.Set(c.streamName, shardID, *r.SequenceNumber); err != nil {
|
span.LogKV("scan.state", status)
|
||||||
|
if err := c.checkpoint.Set(ctx, c.streamName, shardID, *r.SequenceNumber); err != nil {
|
||||||
|
span.LogKV("checkpoint error", err.Error(), "stream.name", c.streamName, "shardID", shardID, "sequenceNumber", *r.SequenceNumber)
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := status.Error; err != nil {
|
if err := status.Error; err != nil {
|
||||||
|
span.LogKV("scan.state", status.Error)
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.counter.Add("records", 1)
|
c.counter.Add("records", 1)
|
||||||
|
|
||||||
if status.StopScan {
|
if status.StopScan {
|
||||||
|
span.LogKV("scan.state", "stopped")
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Consumer) getShardIDs(streamName string) ([]string, error) {
|
func (c *Consumer) getShardIDs(ctx context.Context, streamName string) ([]string, error) {
|
||||||
resp, err := c.client.DescribeStream(
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.getShardIDs")
|
||||||
|
defer span.Finish()
|
||||||
|
span = span.SetTag("streamName", streamName)
|
||||||
|
|
||||||
|
resp, err := c.client.DescribeStreamWithContext(ctx,
|
||||||
&kinesis.DescribeStreamInput{
|
&kinesis.DescribeStreamInput{
|
||||||
StreamName: aws.String(streamName),
|
StreamName: aws.String(streamName),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.LogKV("describe stream error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return nil, fmt.Errorf("describe stream error: %v", err)
|
return nil, fmt.Errorf("describe stream error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -248,12 +292,22 @@ func (c *Consumer) getShardIDs(streamName string) ([]string, error) {
|
||||||
return ss, nil
|
return ss, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Consumer) getShardIterator(streamName, shardID, lastSeqNum string) (*string, error) {
|
func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, lastSeqNum string) (*string, error) {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.getShardIterator",
|
||||||
|
opentracing.Tag{Key: "streamName", Value: streamName},
|
||||||
|
opentracing.Tag{Key: "shardID", Value: shardID},
|
||||||
|
opentracing.Tag{Key: "lastSeqNum", Value: lastSeqNum})
|
||||||
|
defer span.Finish()
|
||||||
|
shard := aws.String(shardID)
|
||||||
|
stream := aws.String(streamName)
|
||||||
params := &kinesis.GetShardIteratorInput{
|
params := &kinesis.GetShardIteratorInput{
|
||||||
ShardId: aws.String(shardID),
|
ShardId: shard,
|
||||||
StreamName: aws.String(streamName),
|
StreamName: stream,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
span = span.SetTag("shardID", shard)
|
||||||
|
span = span.SetTag("streamName", stream)
|
||||||
|
|
||||||
if lastSeqNum != "" {
|
if lastSeqNum != "" {
|
||||||
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
|
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
|
||||||
params.StartingSequenceNumber = aws.String(lastSeqNum)
|
params.StartingSequenceNumber = aws.String(lastSeqNum)
|
||||||
|
|
@ -261,8 +315,10 @@ func (c *Consumer) getShardIterator(streamName, shardID, lastSeqNum string) (*st
|
||||||
params.ShardIteratorType = aws.String("TRIM_HORIZON")
|
params.ShardIteratorType = aws.String("TRIM_HORIZON")
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.client.GetShardIterator(params)
|
resp, err := c.client.GetShardIteratorWithContext(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.LogKV("get shard error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return resp.ShardIterator, nil
|
return resp.ShardIterator, nil
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/request"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||||
)
|
)
|
||||||
|
|
@ -19,6 +20,7 @@ func TestNew(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConsumer_Scan(t *testing.T) {
|
func TestConsumer_Scan(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
records := []*kinesis.Record{
|
records := []*kinesis.Record{
|
||||||
{
|
{
|
||||||
Data: []byte("firstData"),
|
Data: []byte("firstData"),
|
||||||
|
|
@ -30,18 +32,18 @@ func TestConsumer_Scan(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
client := &kinesisClientMock{
|
client := &kinesisClientMock{
|
||||||
getShardIteratorMock: func(input *kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) {
|
getShardIteratorMock: func(a aws.Context, input *kinesis.GetShardIteratorInput, o ...request.Option) (*kinesis.GetShardIteratorOutput, error) {
|
||||||
return &kinesis.GetShardIteratorOutput{
|
return &kinesis.GetShardIteratorOutput{
|
||||||
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
|
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
|
||||||
}, nil
|
}, nil
|
||||||
},
|
},
|
||||||
getRecordsMock: func(input *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
getRecordsMock: func(a aws.Context, input *kinesis.GetRecordsInput, o ...request.Option) (*kinesis.GetRecordsOutput, error) {
|
||||||
return &kinesis.GetRecordsOutput{
|
return &kinesis.GetRecordsOutput{
|
||||||
NextShardIterator: nil,
|
NextShardIterator: nil,
|
||||||
Records: records,
|
Records: records,
|
||||||
}, nil
|
}, nil
|
||||||
},
|
},
|
||||||
describeStreamMock: func(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
|
describeStreamMock: func(a aws.Context, input *kinesis.DescribeStreamInput, o ...request.Option) (*kinesis.DescribeStreamOutput, error) {
|
||||||
return &kinesis.DescribeStreamOutput{
|
return &kinesis.DescribeStreamOutput{
|
||||||
StreamDescription: &kinesis.StreamDescription{
|
StreamDescription: &kinesis.StreamDescription{
|
||||||
Shards: []*kinesis.Shard{
|
Shards: []*kinesis.Shard{
|
||||||
|
|
@ -67,13 +69,13 @@ func TestConsumer_Scan(t *testing.T) {
|
||||||
|
|
||||||
var resultData string
|
var resultData string
|
||||||
var fnCallCounter int
|
var fnCallCounter int
|
||||||
var fn = func(r *Record) ScanStatus {
|
var fn = func(ctx context.Context, r *Record) ScanStatus {
|
||||||
fnCallCounter++
|
fnCallCounter++
|
||||||
resultData += string(r.Data)
|
resultData += string(r.Data)
|
||||||
return ScanStatus{}
|
return ScanStatus{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.Scan(context.Background(), fn); err != nil {
|
if err := c.Scan(ctx, fn); err != nil {
|
||||||
t.Errorf("scan shard error expected nil. got %v", err)
|
t.Errorf("scan shard error expected nil. got %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -87,15 +89,16 @@ func TestConsumer_Scan(t *testing.T) {
|
||||||
t.Errorf("counter error expected %d, got %d", 2, val)
|
t.Errorf("counter error expected %d, got %d", 2, val)
|
||||||
}
|
}
|
||||||
|
|
||||||
val, err := cp.Get("myStreamName", "myShard")
|
val, err := cp.Get(ctx, "myStreamName", "myShard")
|
||||||
if err != nil && val != "lastSeqNum" {
|
if err != nil && val != "lastSeqNum" {
|
||||||
t.Errorf("checkout error expected %s, got %s", "lastSeqNum", val)
|
t.Errorf("checkout error expected %s, got %s", "lastSeqNum", val)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConsumer_Scan_NoShardsAvailable(t *testing.T) {
|
func TestConsumer_Scan_NoShardsAvailable(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
client := &kinesisClientMock{
|
client := &kinesisClientMock{
|
||||||
describeStreamMock: func(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
|
describeStreamMock: func(a aws.Context, input *kinesis.DescribeStreamInput, o ...request.Option) (*kinesis.DescribeStreamOutput, error) {
|
||||||
return &kinesis.DescribeStreamOutput{
|
return &kinesis.DescribeStreamOutput{
|
||||||
StreamDescription: &kinesis.StreamDescription{
|
StreamDescription: &kinesis.StreamDescription{
|
||||||
Shards: make([]*kinesis.Shard, 0),
|
Shards: make([]*kinesis.Shard, 0),
|
||||||
|
|
@ -118,12 +121,12 @@ func TestConsumer_Scan_NoShardsAvailable(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var fnCallCounter int
|
var fnCallCounter int
|
||||||
var fn = func(r *Record) ScanStatus {
|
var fn = func(ctx context.Context, r *Record) ScanStatus {
|
||||||
fnCallCounter++
|
fnCallCounter++
|
||||||
return ScanStatus{}
|
return ScanStatus{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.Scan(context.Background(), fn); err == nil {
|
if err := c.Scan(ctx, fn); err == nil {
|
||||||
t.Errorf("scan shard error expected not nil. got %v", err)
|
t.Errorf("scan shard error expected not nil. got %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -133,13 +136,14 @@ func TestConsumer_Scan_NoShardsAvailable(t *testing.T) {
|
||||||
if val := ctr.counter; val != 0 {
|
if val := ctr.counter; val != 0 {
|
||||||
t.Errorf("counter error expected %d, got %d", 0, val)
|
t.Errorf("counter error expected %d, got %d", 0, val)
|
||||||
}
|
}
|
||||||
val, err := cp.Get("myStreamName", "myShard")
|
val, err := cp.Get(ctx, "myStreamName", "myShard")
|
||||||
if err != nil && val != "" {
|
if err != nil && val != "" {
|
||||||
t.Errorf("checkout error expected %s, got %s", "", val)
|
t.Errorf("checkout error expected %s, got %s", "", val)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScanShard(t *testing.T) {
|
func TestScanShard(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
var records = []*kinesis.Record{
|
var records = []*kinesis.Record{
|
||||||
{
|
{
|
||||||
Data: []byte("firstData"),
|
Data: []byte("firstData"),
|
||||||
|
|
@ -152,12 +156,12 @@ func TestScanShard(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var client = &kinesisClientMock{
|
var client = &kinesisClientMock{
|
||||||
getShardIteratorMock: func(input *kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) {
|
getShardIteratorMock: func(a aws.Context, input *kinesis.GetShardIteratorInput, o ...request.Option) (*kinesis.GetShardIteratorOutput, error) {
|
||||||
return &kinesis.GetShardIteratorOutput{
|
return &kinesis.GetShardIteratorOutput{
|
||||||
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
|
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
|
||||||
}, nil
|
}, nil
|
||||||
},
|
},
|
||||||
getRecordsMock: func(input *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
getRecordsMock: func(a aws.Context, input *kinesis.GetRecordsInput, o ...request.Option) (*kinesis.GetRecordsOutput, error) {
|
||||||
return &kinesis.GetRecordsOutput{
|
return &kinesis.GetRecordsOutput{
|
||||||
NextShardIterator: nil,
|
NextShardIterator: nil,
|
||||||
Records: records,
|
Records: records,
|
||||||
|
|
@ -181,13 +185,13 @@ func TestScanShard(t *testing.T) {
|
||||||
|
|
||||||
// callback fn appends record data
|
// callback fn appends record data
|
||||||
var resultData string
|
var resultData string
|
||||||
var fn = func(r *Record) ScanStatus {
|
var fn = func(ctx context.Context, r *Record) ScanStatus {
|
||||||
resultData += string(r.Data)
|
resultData += string(r.Data)
|
||||||
return ScanStatus{}
|
return ScanStatus{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// scan shard
|
// scan shard
|
||||||
if err := c.ScanShard(context.Background(), "myShard", fn); err != nil {
|
if err := c.ScanShard(ctx, "myShard", fn); err != nil {
|
||||||
t.Fatalf("scan shard error: %v", err)
|
t.Fatalf("scan shard error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -202,13 +206,14 @@ func TestScanShard(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// sets checkpoint
|
// sets checkpoint
|
||||||
val, err := cp.Get("myStreamName", "myShard")
|
val, err := cp.Get(ctx, "myStreamName", "myShard")
|
||||||
if err != nil && val != "lastSeqNum" {
|
if err != nil && val != "lastSeqNum" {
|
||||||
t.Fatalf("checkout error expected %s, got %s", "lastSeqNum", val)
|
t.Fatalf("checkout error expected %s, got %s", "lastSeqNum", val)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScanShard_StopScan(t *testing.T) {
|
func TestScanShard_StopScan(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
var records = []*kinesis.Record{
|
var records = []*kinesis.Record{
|
||||||
{
|
{
|
||||||
Data: []byte("firstData"),
|
Data: []byte("firstData"),
|
||||||
|
|
@ -221,12 +226,12 @@ func TestScanShard_StopScan(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var client = &kinesisClientMock{
|
var client = &kinesisClientMock{
|
||||||
getShardIteratorMock: func(input *kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) {
|
getShardIteratorMock: func(a aws.Context, input *kinesis.GetShardIteratorInput, o ...request.Option) (*kinesis.GetShardIteratorOutput, error) {
|
||||||
return &kinesis.GetShardIteratorOutput{
|
return &kinesis.GetShardIteratorOutput{
|
||||||
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
|
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
|
||||||
}, nil
|
}, nil
|
||||||
},
|
},
|
||||||
getRecordsMock: func(input *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
getRecordsMock: func(a aws.Context, input *kinesis.GetRecordsInput, o ...request.Option) (*kinesis.GetRecordsOutput, error) {
|
||||||
return &kinesis.GetRecordsOutput{
|
return &kinesis.GetRecordsOutput{
|
||||||
NextShardIterator: nil,
|
NextShardIterator: nil,
|
||||||
Records: records,
|
Records: records,
|
||||||
|
|
@ -241,12 +246,12 @@ func TestScanShard_StopScan(t *testing.T) {
|
||||||
|
|
||||||
// callback fn appends record data
|
// callback fn appends record data
|
||||||
var resultData string
|
var resultData string
|
||||||
var fn = func(r *Record) ScanStatus {
|
var fn = func(ctx context.Context, r *Record) ScanStatus {
|
||||||
resultData += string(r.Data)
|
resultData += string(r.Data)
|
||||||
return ScanStatus{StopScan: true}
|
return ScanStatus{StopScan: true}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.ScanShard(context.Background(), "myShard", fn); err != nil {
|
if err := c.ScanShard(ctx, "myShard", fn); err != nil {
|
||||||
t.Fatalf("scan shard error: %v", err)
|
t.Fatalf("scan shard error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -256,13 +261,14 @@ func TestScanShard_StopScan(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScanShard_ShardIsClosed(t *testing.T) {
|
func TestScanShard_ShardIsClosed(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
var client = &kinesisClientMock{
|
var client = &kinesisClientMock{
|
||||||
getShardIteratorMock: func(input *kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) {
|
getShardIteratorMock: func(a aws.Context, input *kinesis.GetShardIteratorInput, o ...request.Option) (*kinesis.GetShardIteratorOutput, error) {
|
||||||
return &kinesis.GetShardIteratorOutput{
|
return &kinesis.GetShardIteratorOutput{
|
||||||
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
|
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
|
||||||
}, nil
|
}, nil
|
||||||
},
|
},
|
||||||
getRecordsMock: func(input *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
getRecordsMock: func(a aws.Context, input *kinesis.GetRecordsInput, o ...request.Option) (*kinesis.GetRecordsOutput, error) {
|
||||||
return &kinesis.GetRecordsOutput{
|
return &kinesis.GetRecordsOutput{
|
||||||
NextShardIterator: nil,
|
NextShardIterator: nil,
|
||||||
Records: make([]*Record, 0),
|
Records: make([]*Record, 0),
|
||||||
|
|
@ -275,32 +281,32 @@ func TestScanShard_ShardIsClosed(t *testing.T) {
|
||||||
t.Fatalf("new consumer error: %v", err)
|
t.Fatalf("new consumer error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var fn = func(r *Record) ScanStatus {
|
var fn = func(ctx context.Context, r *Record) ScanStatus {
|
||||||
return ScanStatus{}
|
return ScanStatus{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.ScanShard(context.Background(), "myShard", fn); err != nil {
|
if err := c.ScanShard(ctx, "myShard", fn); err != nil {
|
||||||
t.Fatalf("scan shard error: %v", err)
|
t.Fatalf("scan shard error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type kinesisClientMock struct {
|
type kinesisClientMock struct {
|
||||||
kinesisiface.KinesisAPI
|
kinesisiface.KinesisAPI
|
||||||
getShardIteratorMock func(*kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error)
|
getShardIteratorMock func(aws.Context, *kinesis.GetShardIteratorInput, ...request.Option) (*kinesis.GetShardIteratorOutput, error)
|
||||||
getRecordsMock func(*kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error)
|
getRecordsMock func(aws.Context, *kinesis.GetRecordsInput, ...request.Option) (*kinesis.GetRecordsOutput, error)
|
||||||
describeStreamMock func(*kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error)
|
describeStreamMock func(aws.Context, *kinesis.DescribeStreamInput, ...request.Option) (*kinesis.DescribeStreamOutput, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *kinesisClientMock) GetRecords(in *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
func (c *kinesisClientMock) GetRecordsWithContext(a aws.Context, in *kinesis.GetRecordsInput, o ...request.Option) (*kinesis.GetRecordsOutput, error) {
|
||||||
return c.getRecordsMock(in)
|
return c.getRecordsMock(a, in, o...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *kinesisClientMock) GetShardIterator(in *kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) {
|
func (c *kinesisClientMock) GetShardIteratorWithContext(a aws.Context, in *kinesis.GetShardIteratorInput, o ...request.Option) (*kinesis.GetShardIteratorOutput, error) {
|
||||||
return c.getShardIteratorMock(in)
|
return c.getShardIteratorMock(a, in, o...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *kinesisClientMock) DescribeStream(in *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
|
func (c *kinesisClientMock) DescribeStreamWithContext(a aws.Context, in *kinesis.DescribeStreamInput, o ...request.Option) (*kinesis.DescribeStreamOutput, error) {
|
||||||
return c.describeStreamMock(in)
|
return c.describeStreamMock(a, in, o...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// implementation of checkpoint
|
// implementation of checkpoint
|
||||||
|
|
@ -309,7 +315,7 @@ type fakeCheckpoint struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fc *fakeCheckpoint) Set(streamName, shardID, sequenceNumber string) error {
|
func (fc *fakeCheckpoint) Set(ctx context.Context, streamName, shardID, sequenceNumber string) error {
|
||||||
fc.mu.Lock()
|
fc.mu.Lock()
|
||||||
defer fc.mu.Unlock()
|
defer fc.mu.Unlock()
|
||||||
|
|
||||||
|
|
@ -318,7 +324,7 @@ func (fc *fakeCheckpoint) Set(streamName, shardID, sequenceNumber string) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fc *fakeCheckpoint) Get(streamName, shardID string) (string, error) {
|
func (fc *fakeCheckpoint) Get(ctx context.Context, streamName, shardID string) (string, error) {
|
||||||
fc.mu.Lock()
|
fc.mu.Lock()
|
||||||
defer fc.mu.Unlock()
|
defer fc.mu.Unlock()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,14 +16,19 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
|
"github.com/opentracing/opentracing-go"
|
||||||
|
|
||||||
alog "github.com/apex/log"
|
alog "github.com/apex/log"
|
||||||
"github.com/apex/log/handlers/text"
|
"github.com/apex/log/handlers/text"
|
||||||
|
|
||||||
consumer "github.com/harlow/kinesis-consumer"
|
consumer "github.com/harlow/kinesis-consumer"
|
||||||
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
|
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
|
||||||
|
|
||||||
|
"github.com/harlow/kinesis-consumer/examples/distributed-tracing/utility"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const serviceName = "checkpoint.dynamodb"
|
||||||
|
|
||||||
// kick off a server for exposing scan metrics
|
// kick off a server for exposing scan metrics
|
||||||
func init() {
|
func init() {
|
||||||
sock, err := net.Listen("tcp", "localhost:8080")
|
sock, err := net.Listen("tcp", "localhost:8080")
|
||||||
|
|
@ -62,6 +67,12 @@ func main() {
|
||||||
)
|
)
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
tracer, closer := utility.NewTracer(serviceName)
|
||||||
|
defer closer.Close()
|
||||||
|
opentracing.InitGlobalTracer(tracer)
|
||||||
|
span := tracer.StartSpan("consumer.main")
|
||||||
|
ctx := opentracing.ContextWithSpan(context.Background(), span)
|
||||||
|
|
||||||
// Following will overwrite the default dynamodb client
|
// Following will overwrite the default dynamodb client
|
||||||
// Older versions of aws sdk does not picking up aws config properly.
|
// Older versions of aws sdk does not picking up aws config properly.
|
||||||
// You probably need to update aws sdk verison. Tested the following with 1.13.59
|
// You probably need to update aws sdk verison. Tested the following with 1.13.59
|
||||||
|
|
@ -72,7 +83,7 @@ func main() {
|
||||||
)
|
)
|
||||||
|
|
||||||
// ddb checkpoint
|
// ddb checkpoint
|
||||||
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{}))
|
ck, err := checkpoint.New(ctx, *app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Log("checkpoint error: %v", err)
|
log.Log("checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -121,7 +132,7 @@ func main() {
|
||||||
log.Log("scan error: %v", err)
|
log.Log("scan error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ck.Shutdown(); err != nil {
|
if err := ck.Shutdown(ctx); err != nil {
|
||||||
log.Log("checkpoint shutdown error: %v", err)
|
log.Log("checkpoint shutdown error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
68
examples/distributed-tracing/README.md
Normal file
68
examples/distributed-tracing/README.md
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
# Examples with opentracing
|
||||||
|
|
||||||
|
The examples are roughly the same as those without tracing, but to demonstrate what the code will look like with distributed tracing integrated. The tracing api spec that we are using is Opentracing, due to a wider and more stable support at the moment.
|
||||||
|
|
||||||
|
Please refer to README under examples/consumer and examples/producer.
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
### Setup data for producer to upload
|
||||||
|
|
||||||
|
$ curl https://s3.amazonaws.com/kinesis.test/users.txt > /tmp/users.txt
|
||||||
|
$ go run main.go --stream streamName
|
||||||
|
|
||||||
|
### Setup AWS
|
||||||
|
|
||||||
|
For consumer and producer:
|
||||||
|
|
||||||
|
* export the required environment vars for connecting to the AWS resources:
|
||||||
|
|
||||||
|
```
|
||||||
|
export AWS_ACCESS_KEY=
|
||||||
|
export AWS_REGION=
|
||||||
|
export AWS_SECRET_KEY=
|
||||||
|
```
|
||||||
|
|
||||||
|
* export the Jaeger Environment to connect to Jaeger agent:
|
||||||
|
Reference (https://www.jaegertracing.io) for various variables settings.
|
||||||
|
|
||||||
|
```
|
||||||
|
export JAEGER_SAMPLER_TYPE=const
|
||||||
|
export JAEGER_SAMPLER_PARAM=1
|
||||||
|
export JAEGER_AGENT_HOST=localhost
|
||||||
|
export JAEGER_AGENT_PORT=6831
|
||||||
|
```
|
||||||
|
|
||||||
|
### Setup Backend
|
||||||
|
|
||||||
|
For demo purposes, we are going to use Jaeger as the tracing backend.
|
||||||
|
|
||||||
|
### Tracing Backend
|
||||||
|
Please refer to docs in reference section for Jaeger.
|
||||||
|
Setup Jaeger Agent using the all-in-one docker image
|
||||||
|
```
|
||||||
|
$ docker run -d --name jaeger \
|
||||||
|
-e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
|
||||||
|
-p 5775:5775/udp \
|
||||||
|
-p 6831:6831/udp \
|
||||||
|
-p 6832:6832/udp \
|
||||||
|
-p 5778:5778 \
|
||||||
|
-p 16686:16686 \
|
||||||
|
-p 14268:14268 \
|
||||||
|
-p 9411:9411 \
|
||||||
|
jaegertracing/all-in-one:1.6
|
||||||
|
|
||||||
|
```
|
||||||
|
You should be able to access the UI via http://localhost:16686.
|
||||||
|
|
||||||
|
## Development
|
||||||
|
You need opentracing-go as development depenency. If you want to see the result on UI, you need to choose an appropriate vendor (https://opentracing.io/)
|
||||||
|
|
||||||
|
```
|
||||||
|
go get -u github.com/opentracing/opentracing-go
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
References:
|
||||||
|
> Opentracing (https://github.com/opentracing/opentracing-go) ·
|
||||||
|
> Jaeger (https://www.jaegertracing.io/docs/1.6/getting-started/) ·
|
||||||
|
> Prometheus (https://prometheus.io/docs/prometheus/latest/installation/) ·
|
||||||
135
examples/distributed-tracing/consumer/consumer.go
Normal file
135
examples/distributed-tracing/consumer/consumer.go
Normal file
|
|
@ -0,0 +1,135 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"expvar"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
|
||||||
|
alog "github.com/apex/log"
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||||
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
|
consumer "github.com/harlow/kinesis-consumer"
|
||||||
|
"github.com/opentracing/opentracing-go"
|
||||||
|
"github.com/opentracing/opentracing-go/ext"
|
||||||
|
|
||||||
|
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
|
||||||
|
"github.com/harlow/kinesis-consumer/examples/distributed-tracing/utility"
|
||||||
|
)
|
||||||
|
|
||||||
|
const serviceName = "consumer"
|
||||||
|
|
||||||
|
// kick off a server for exposing scan metrics
|
||||||
|
func init() {
|
||||||
|
sock, err := net.Listen("tcp", "localhost:8080")
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("net listen error: %v", err)
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
fmt.Println("Metrics available at http://localhost:8080/debug/vars")
|
||||||
|
http.Serve(sock, nil)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
ctx := context.Background()
|
||||||
|
log := utility.NewLogger(serviceName, alog.DebugLevel)
|
||||||
|
tracer, closer := utility.NewTracer(serviceName)
|
||||||
|
defer closer.Close()
|
||||||
|
opentracing.InitGlobalTracer(tracer)
|
||||||
|
|
||||||
|
span := tracer.StartSpan("consumer.main")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
app := flag.String("app", "", "App name")
|
||||||
|
stream := flag.String("stream", "", "Stream name")
|
||||||
|
table := flag.String("table", "", "Checkpoint table name")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
span.SetBaggageItem("app.name", *app)
|
||||||
|
span.SetBaggageItem("stream.name", *stream)
|
||||||
|
span.SetBaggageItem("table.name", *table)
|
||||||
|
|
||||||
|
fmt.Println("set tag....")
|
||||||
|
|
||||||
|
// Following will overwrite the default dynamodb client
|
||||||
|
// Older versions of aws sdk does not picking up aws config properly.
|
||||||
|
// You probably need to update aws sdk verison. Tested the following with 1.13.59
|
||||||
|
cfg := aws.NewConfig().WithRegion("us-west-2")
|
||||||
|
sess := session.New(cfg)
|
||||||
|
sess = utility.WrapSession(sess)
|
||||||
|
myDynamoDbClient := dynamodb.New(sess)
|
||||||
|
|
||||||
|
// ddb checkpoint
|
||||||
|
ctx = opentracing.ContextWithSpan(ctx, span)
|
||||||
|
retryer := utility.NewRetryer()
|
||||||
|
ck, err := checkpoint.New(ctx, *app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(retryer))
|
||||||
|
if err != nil {
|
||||||
|
span.LogKV("checkpoint error", err.Error())
|
||||||
|
span.SetTag("consumer.retry.count", retryer.Count())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
// Need to end span here, since Fatalf calls os.Exit
|
||||||
|
log.Log("checkpoint error", "error", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
var counter = expvar.NewMap("counters")
|
||||||
|
|
||||||
|
// The following 2 lines will overwrite the default kinesis client
|
||||||
|
ksis := kinesis.New(sess)
|
||||||
|
|
||||||
|
// consumer
|
||||||
|
c, err := consumer.New(
|
||||||
|
*stream,
|
||||||
|
consumer.WithCheckpoint(ck),
|
||||||
|
consumer.WithLogger(log),
|
||||||
|
consumer.WithCounter(counter),
|
||||||
|
consumer.WithClient(ksis),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
span.LogKV("consumer initialization error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
log.Log("consumer initialization error", "error", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// use cancel func to signal shutdown
|
||||||
|
ctx = opentracing.ContextWithSpan(ctx, span)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
// trap SIGINT, wait to trigger shutdown
|
||||||
|
signals := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(signals, os.Interrupt)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-signals
|
||||||
|
span.Finish()
|
||||||
|
closer.Close()
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// scan stream
|
||||||
|
err = c.Scan(ctx, func(ctx context.Context, r *consumer.Record) consumer.ScanStatus {
|
||||||
|
span, _ := opentracing.StartSpanFromContext(ctx, "consumer.processRecord")
|
||||||
|
defer span.Finish()
|
||||||
|
fmt.Println(string(r.Data))
|
||||||
|
// continue scanning
|
||||||
|
return consumer.ScanStatus{}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
span.LogKV("consumer scan error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
log.Log("consumer scan error", "error", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ck.Shutdown(ctx); err != nil {
|
||||||
|
span.LogKV("consumer shutdown error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
log.Log("checkpoint shutdown error", "error", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
BIN
examples/distributed-tracing/producer.png
Normal file
BIN
examples/distributed-tracing/producer.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 539 KiB |
107
examples/distributed-tracing/producer/producer.go
Normal file
107
examples/distributed-tracing/producer/producer.go
Normal file
|
|
@ -0,0 +1,107 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||||
|
|
||||||
|
"github.com/opentracing/opentracing-go"
|
||||||
|
"github.com/opentracing/opentracing-go/ext"
|
||||||
|
|
||||||
|
"github.com/harlow/kinesis-consumer/examples/distributed-tracing/utility"
|
||||||
|
)
|
||||||
|
|
||||||
|
const serviceName = "producer"
|
||||||
|
const dataFile = "./users.txt"
|
||||||
|
|
||||||
|
var svc kinesisiface.KinesisAPI
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
tracer, closer := utility.NewTracer(serviceName)
|
||||||
|
// Jaeger tracer implements Close not opentracing
|
||||||
|
defer closer.Close()
|
||||||
|
opentracing.InitGlobalTracer(tracer)
|
||||||
|
|
||||||
|
cfg := aws.NewConfig().WithRegion("us-west-2")
|
||||||
|
sess := session.New(cfg)
|
||||||
|
sess = utility.WrapSession(sess)
|
||||||
|
svc = kinesis.New(sess)
|
||||||
|
|
||||||
|
ctx, _ := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
span := tracer.StartSpan("producer.main")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
var streamName = flag.String("stream", "", "Stream name")
|
||||||
|
flag.Parse()
|
||||||
|
span.SetBaggageItem("producer.stream.name", *streamName)
|
||||||
|
|
||||||
|
// download file with test data
|
||||||
|
// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt
|
||||||
|
f, err := os.Open(dataFile)
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
span.LogKV("file open error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
// Need to end span here, since Fatalf calls os.Exit
|
||||||
|
span.Finish()
|
||||||
|
closer.Close()
|
||||||
|
log.Fatal(fmt.Sprintf("Cannot open %s file", dataFile))
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
span.SetTag("producer.file.name", f.Name())
|
||||||
|
|
||||||
|
// Wrap the span with meta into context and flow that
|
||||||
|
// to another component.
|
||||||
|
ctx = opentracing.ContextWithSpan(ctx, span)
|
||||||
|
|
||||||
|
var records []*kinesis.PutRecordsRequestEntry
|
||||||
|
|
||||||
|
// loop over file data
|
||||||
|
b := bufio.NewScanner(f)
|
||||||
|
for b.Scan() {
|
||||||
|
records = append(records, &kinesis.PutRecordsRequestEntry{
|
||||||
|
Data: b.Bytes(),
|
||||||
|
PartitionKey: aws.String(time.Now().Format(time.RFC3339Nano)),
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(records) > 250 {
|
||||||
|
putRecords(ctx, streamName, records)
|
||||||
|
records = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(records) > 0 {
|
||||||
|
putRecords(ctx, streamName, records)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func putRecords(ctx context.Context, streamName *string, records []*kinesis.PutRecordsRequestEntry) {
|
||||||
|
// I am assuming each new AWS call is a new Span
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "producer.putRecords")
|
||||||
|
defer span.Finish()
|
||||||
|
span.SetTag("producer.records.count", len(records))
|
||||||
|
ctx = opentracing.ContextWithSpan(ctx, span)
|
||||||
|
_, err := svc.PutRecordsWithContext(ctx, &kinesis.PutRecordsInput{
|
||||||
|
StreamName: streamName,
|
||||||
|
Records: records,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// Log the error details and set the Span as failee
|
||||||
|
span.LogKV("put records error", err.Error())
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
// Need to end span here, since Fatalf calls os.Exit
|
||||||
|
span.Finish()
|
||||||
|
log.Fatalf("error putting records: %v", err)
|
||||||
|
}
|
||||||
|
fmt.Print(".")
|
||||||
|
}
|
||||||
111
examples/distributed-tracing/utility/aws.go
Normal file
111
examples/distributed-tracing/utility/aws.go
Normal file
|
|
@ -0,0 +1,111 @@
|
||||||
|
package utility
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws/request"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/opentracing/opentracing-go"
|
||||||
|
"github.com/opentracing/opentracing-go/ext"
|
||||||
|
)
|
||||||
|
|
||||||
|
// From github.com/aws/aws-xray-sdk-go/xray/aws.go
|
||||||
|
const S3ExtendedRequestIDHeaderKey string = "x-amz-id-2"
|
||||||
|
|
||||||
|
// When you initiate any resource client and pass in a AWS session, it does a few things:
|
||||||
|
// * session carries the configuration to make and sign the request header
|
||||||
|
// * session embodies a set of default request handlers to be execute in order
|
||||||
|
// * AWS Client calls a list of request handlers before sending out a raw http request.
|
||||||
|
//
|
||||||
|
// For set of request handlers see: https://github.com/aws/aws-sdk-go/blob/master/aws/request/handlers.go
|
||||||
|
// For starting and ending a span, we are going to insert 1 handler in front and 1 at the end.
|
||||||
|
// Span annotation will be done as see fit inside the handler.
|
||||||
|
type handlers struct{}
|
||||||
|
|
||||||
|
// WrapSession wraps a session.Session, causing requests and responses to be traced.
|
||||||
|
func WrapSession(s *session.Session) *session.Session {
|
||||||
|
// clone the session to avoid any sharing issue.
|
||||||
|
s = s.Copy()
|
||||||
|
h := &handlers{}
|
||||||
|
// set our handlers for starting and ending a span.
|
||||||
|
s.Handlers.Send.PushFrontNamed(request.NamedHandler{
|
||||||
|
Name: "tracing.Send",
|
||||||
|
Fn: h.Send,
|
||||||
|
})
|
||||||
|
s.Handlers.Complete.PushBackNamed(request.NamedHandler{
|
||||||
|
Name: "tracing.Complete",
|
||||||
|
Fn: h.Complete,
|
||||||
|
})
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send creates a new span and be a dependent span if there is a parent span in the context,
|
||||||
|
// otherwise a new root span. Annotate the span with metadata. Then wrap the span inside the context
|
||||||
|
// before sending downstream.
|
||||||
|
func (h *handlers) Send(req *request.Request) {
|
||||||
|
// We are setting the span name and mark that this span is initiating from a client.
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(req.Context(), h.operationName(req))
|
||||||
|
ext.SpanKindRPCClient.Set(span)
|
||||||
|
span = span.SetTag("aws.serviceName", h.serviceName(req))
|
||||||
|
span = span.SetTag("aws.resource", h.resourceName(req))
|
||||||
|
span = span.SetTag("aws.agent", h.awsAgent(req))
|
||||||
|
span = span.SetTag("aws.operation", req.Operation.Name)
|
||||||
|
span = span.SetTag("aws.region", req.ClientInfo.SigningRegion)
|
||||||
|
ext.HTTPMethod.Set(span, req.Operation.HTTPMethod)
|
||||||
|
ext.HTTPUrl.Set(span, req.HTTPRequest.URL.String())
|
||||||
|
|
||||||
|
req.SetContext(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handlers) Complete(req *request.Request) {
|
||||||
|
ctx := req.Context()
|
||||||
|
span := opentracing.SpanFromContext(ctx)
|
||||||
|
defer span.Finish()
|
||||||
|
defer FailIfError(span, req.Error)
|
||||||
|
span = span.SetTag("aws.requestID", req.RequestID)
|
||||||
|
span = span.SetTag("aws.request.retryCount", req.RetryCount)
|
||||||
|
if req.HTTPResponse != nil {
|
||||||
|
ext.HTTPStatusCode.Set(span, uint16(req.HTTPResponse.StatusCode))
|
||||||
|
span = span.SetTag("aws.response.contentLength", req.HTTPResponse.ContentLength)
|
||||||
|
extendedRequestID := req.HTTPResponse.Header.Get(S3ExtendedRequestIDHeaderKey)
|
||||||
|
if len(strings.TrimSpace(extendedRequestID)) > 0 {
|
||||||
|
span = span.SetTag("aws.response.extendedRequestID", extendedRequestID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if request.IsErrorThrottle(req.Error) {
|
||||||
|
span = span.SetTag("aws.request.throttled", "true")
|
||||||
|
}
|
||||||
|
ctx = opentracing.ContextWithSpan(ctx, span)
|
||||||
|
req.SetContext(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handlers) operationName(req *request.Request) string {
|
||||||
|
return h.awsService(req) + ".command"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handlers) resourceName(req *request.Request) string {
|
||||||
|
return h.awsService(req) + "." + req.Operation.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handlers) serviceName(req *request.Request) string {
|
||||||
|
return "aws." + h.awsService(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handlers) awsAgent(req *request.Request) string {
|
||||||
|
agent := req.HTTPRequest.Header.Get("User-Agent")
|
||||||
|
if agent != "" {
|
||||||
|
return agent
|
||||||
|
}
|
||||||
|
return "aws-sdk-go"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handlers) awsService(req *request.Request) string {
|
||||||
|
return req.ClientInfo.ServiceName
|
||||||
|
}
|
||||||
|
|
||||||
|
func FailIfError(span opentracing.Span, err error) {
|
||||||
|
if err != nil {
|
||||||
|
ext.Error.Set(span, true)
|
||||||
|
span.LogKV("aws request error", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
53
examples/distributed-tracing/utility/aws_retryer.go
Normal file
53
examples/distributed-tracing/utility/aws_retryer.go
Normal file
|
|
@ -0,0 +1,53 @@
|
||||||
|
package utility
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||||
|
|
||||||
|
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
|
||||||
|
)
|
||||||
|
|
||||||
|
const defaultSleepInterval = 30 * time.Second
|
||||||
|
|
||||||
|
// Retryer used for checkpointing
|
||||||
|
type Retryer struct {
|
||||||
|
checkpoint.Retryer
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRetryer() *Retryer {
|
||||||
|
return &Retryer{count: 0}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ShouldRetry implements custom logic for when a checkpont should retry
|
||||||
|
func (r *Retryer) ShouldRetry(err error) bool {
|
||||||
|
if awsErr, ok := err.(awserr.Error); ok {
|
||||||
|
switch awsErr.Code() {
|
||||||
|
case dynamodb.ErrCodeProvisionedThroughputExceededException, dynamodb.ErrCodeLimitExceededException:
|
||||||
|
jitter := rand.New(rand.NewSource(0))
|
||||||
|
atomic.AddUint64(&r.count, 1)
|
||||||
|
// You can have more sophisticated sleep mechanism
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
|
randomSleep(defaultSleepInterval, jitter)
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r Retryer) Count() uint {
|
||||||
|
return uint(r.count)
|
||||||
|
}
|
||||||
|
|
||||||
|
func randomSleep(d time.Duration, r *rand.Rand) time.Duration {
|
||||||
|
if d == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return d + time.Duration(r.Int63n(2*int64(d)))
|
||||||
|
}
|
||||||
33
examples/distributed-tracing/utility/logger.go
Normal file
33
examples/distributed-tracing/utility/logger.go
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
package utility
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
alog "github.com/apex/log"
|
||||||
|
"github.com/apex/log/handlers/text"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A logger provides a minimalistic logger satisfying the Logger interface.
|
||||||
|
type logger struct {
|
||||||
|
serviceName string
|
||||||
|
logger alog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLogger(serviceName string, level alog.Level) *logger {
|
||||||
|
return &logger{
|
||||||
|
serviceName: serviceName,
|
||||||
|
logger: alog.Logger{
|
||||||
|
Handler: text.New(os.Stdout),
|
||||||
|
Level: level,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log logs the parameters to the stdlib logger. See log.Println.
|
||||||
|
func (l *logger) Log(args ...interface{}) {
|
||||||
|
l.logger.Infof(l.serviceName, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *logger) Fatalf(args ...interface{}) {
|
||||||
|
l.logger.Fatalf(l.serviceName, args...)
|
||||||
|
}
|
||||||
33
examples/distributed-tracing/utility/tracer.go
Normal file
33
examples/distributed-tracing/utility/tracer.go
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
package utility
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/opentracing/opentracing-go"
|
||||||
|
jaegerConfig "github.com/uber/jaeger-client-go/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A Noop opentracing tracer
|
||||||
|
var globalTracer = &opentracing.NoopTracer{}
|
||||||
|
|
||||||
|
// A Noop io.Closer
|
||||||
|
type nullCloser struct{}
|
||||||
|
|
||||||
|
func (*nullCloser) Close() error { return nil }
|
||||||
|
|
||||||
|
func NewTracer(serviceName string) (opentracing.Tracer, io.Closer) {
|
||||||
|
config, err := jaegerConfig.FromEnv()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error loading tracer config: %s", err.Error())
|
||||||
|
return globalTracer, &nullCloser{}
|
||||||
|
}
|
||||||
|
if len(serviceName) > 0 {
|
||||||
|
config.ServiceName = serviceName
|
||||||
|
}
|
||||||
|
tracer, closer, err := config.New(serviceName)
|
||||||
|
if err != nil {
|
||||||
|
panic("cannot init jaeger")
|
||||||
|
}
|
||||||
|
return tracer, closer
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue