Use AWS built-in retry logic and refactor tests (#24)

Update the unit test and move integration test under test folder.
Update retry logic by switching to AWS's default retry.

Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
Tao Jiang 2019-06-27 21:56:44 -07:00
parent 6df520b343
commit 250bb2e9ff
10 changed files with 324 additions and 163 deletions

View file

@ -24,6 +24,7 @@ targets:
- export GO111MODULE=on
- go mod download
- go mod vendor
- go mod tidy
build:
description: build source code
@ -43,7 +44,6 @@ targets:
description: run CI tests
after:
- deps
- check
cmds:
- ./support/scripts/ci.sh
@ -86,7 +86,7 @@ targets:
settings:
default-targets:
- ci
- test
docker:
image: 'vmware/go-kcl-toolchain:0.1.2'
src-volume: /go/src/github.com/vmware/vmware-go-kcl

View file

@ -29,14 +29,14 @@ package checkpoint
import (
"errors"
"math"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/session"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/matryer/try"
log "github.com/sirupsen/logrus"
"github.com/vmware/vmware-go-kcl/clientlibrary/config"
@ -46,6 +46,9 @@ import (
const (
// ErrInvalidDynamoDBSchema is returned when there are one or more fields missing from the table
ErrInvalidDynamoDBSchema = "The DynamoDB schema is invalid and may need to be re-created"
// NumMaxRetries is the max times of doing retry
NumMaxRetries = 5
)
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
@ -54,28 +57,53 @@ type DynamoCheckpoint struct {
leaseTableReadCapacity int64
leaseTableWriteCapacity int64
LeaseDuration int
svc dynamodbiface.DynamoDBAPI
kclConfig *config.KinesisClientLibConfiguration
Retries int
LeaseDuration int
svc dynamodbiface.DynamoDBAPI
kclConfig *config.KinesisClientLibConfiguration
Retries int
skipTableCheck bool
}
func NewDynamoCheckpoint(dynamo dynamodbiface.DynamoDBAPI, kclConfig *config.KinesisClientLibConfiguration) Checkpointer {
func NewDynamoCheckpoint(kclConfig *config.KinesisClientLibConfiguration) *DynamoCheckpoint {
checkpointer := &DynamoCheckpoint{
TableName: kclConfig.TableName,
leaseTableReadCapacity: int64(kclConfig.InitialLeaseTableReadCapacity),
leaseTableWriteCapacity: int64(kclConfig.InitialLeaseTableWriteCapacity),
LeaseDuration: kclConfig.FailoverTimeMillis,
svc: dynamo,
kclConfig: kclConfig,
Retries: 5,
Retries: NumMaxRetries,
}
return checkpointer
}
// WithDynamoDB is used to provide DynamoDB service
func (checkpointer *DynamoCheckpoint) WithDynamoDB(svc dynamodbiface.DynamoDBAPI) *DynamoCheckpoint {
checkpointer.svc = svc
return checkpointer
}
// Init initialises the DynamoDB Checkpoint
func (checkpointer *DynamoCheckpoint) Init() error {
if !checkpointer.doesTableExist() {
log.Info("Creating DynamoDB session")
s, err := session.NewSession(&aws.Config{
Region: aws.String(checkpointer.kclConfig.RegionName),
Endpoint: &checkpointer.kclConfig.DynamoDBEndpoint,
Credentials: checkpointer.kclConfig.DynamoDBCredentials,
Retryer: client.DefaultRetryer{NumMaxRetries: checkpointer.Retries},
})
if err != nil {
// no need to move forward
log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err)
}
if checkpointer.svc == nil {
checkpointer.svc = dynamodb.New(s)
}
if !checkpointer.skipTableCheck && !checkpointer.doesTableExist() {
return checkpointer.createTable()
}
return nil
@ -272,68 +300,30 @@ func (checkpointer *DynamoCheckpoint) conditionalUpdate(conditionExpression stri
}
func (checkpointer *DynamoCheckpoint) putItem(input *dynamodb.PutItemInput) error {
return try.Do(func(attempt int) (bool, error) {
_, err := checkpointer.svc.PutItem(input)
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException ||
awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
attempt < checkpointer.Retries {
// Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
time.Sleep(time.Duration(math.Exp2(float64(attempt))*100) * time.Millisecond)
return true, err
}
}
return false, err
})
_, err := checkpointer.svc.PutItem(input)
return err
}
func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynamodb.AttributeValue, error) {
var item *dynamodb.GetItemOutput
err := try.Do(func(attempt int) (bool, error) {
var err error
item, err = checkpointer.svc.GetItem(&dynamodb.GetItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
S: aws.String(shardID),
},
item, err := checkpointer.svc.GetItem(&dynamodb.GetItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
S: aws.String(shardID),
},
})
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException ||
awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
attempt < checkpointer.Retries {
// Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
time.Sleep(time.Duration(math.Exp2(float64(attempt))*100) * time.Millisecond)
return true, err
}
}
return false, err
},
})
return item.Item, err
}
func (checkpointer *DynamoCheckpoint) removeItem(shardID string) error {
err := try.Do(func(attempt int) (bool, error) {
var err error
_, err = checkpointer.svc.DeleteItem(&dynamodb.DeleteItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
S: aws.String(shardID),
},
_, err := checkpointer.svc.DeleteItem(&dynamodb.DeleteItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
S: aws.String(shardID),
},
})
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException ||
awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
attempt < checkpointer.Retries {
// Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
time.Sleep(time.Duration(math.Exp2(float64(attempt))*100) * time.Millisecond)
return true, err
}
}
return false, err
},
})
return err
}

View file

@ -0,0 +1,170 @@
/*
* Copyright (c) 2019 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// The implementation is derived from https://github.com/patrobinson/gokini
//
// Copyright 2018 Patrick robinson
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package checkpoint
import (
"errors"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config"
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
)
func TestDoesTableExist(t *testing.T) {
svc := &mockDynamoDB{tableExist: true}
checkpoint := &DynamoCheckpoint{
TableName: "TableName",
svc: svc,
}
if !checkpoint.doesTableExist() {
t.Error("Table exists but returned false")
}
svc = &mockDynamoDB{tableExist: false}
checkpoint.svc = svc
if checkpoint.doesTableExist() {
t.Error("Table does not exist but returned true")
}
}
func TestGetLeaseNotAquired(t *testing.T) {
svc := &mockDynamoDB{tableExist: true}
kclConfig := cfg.NewKinesisClientLibConfig("appName", "test", "us-west-2", "abc").
WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000).
WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20)
checkpoint := NewDynamoCheckpoint(kclConfig).WithDynamoDB(svc)
checkpoint.Init()
err := checkpoint.GetLease(&par.ShardStatus{
ID: "0001",
Checkpoint: "",
Mux: &sync.Mutex{},
}, "abcd-efgh")
if err != nil {
t.Errorf("Error getting lease %s", err)
}
err = checkpoint.GetLease(&par.ShardStatus{
ID: "0001",
Checkpoint: "",
Mux: &sync.Mutex{},
}, "ijkl-mnop")
if err == nil || err.Error() != ErrLeaseNotAquired {
t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err)
}
}
func TestGetLeaseAquired(t *testing.T) {
svc := &mockDynamoDB{tableExist: true}
kclConfig := cfg.NewKinesisClientLibConfig("appName", "test", "us-west-2", "abc").
WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000).
WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20)
checkpoint := NewDynamoCheckpoint(kclConfig).WithDynamoDB(svc)
checkpoint.Init()
checkpoint.svc = svc
marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
"ShardID": {
S: aws.String("0001"),
},
"AssignedTo": {
S: aws.String("abcd-efgh"),
},
"LeaseTimeout": {
S: aws.String(time.Now().AddDate(0, -1, 0).UTC().Format(time.RFC3339)),
},
"SequenceID": {
S: aws.String("deadbeef"),
},
}
input := &dynamodb.PutItemInput{
TableName: aws.String("TableName"),
Item: marshalledCheckpoint,
}
checkpoint.svc.PutItem(input)
shard := &par.ShardStatus{
ID: "0001",
Checkpoint: "deadbeef",
Mux: &sync.Mutex{},
}
err := checkpoint.GetLease(shard, "ijkl-mnop")
if err != nil {
t.Errorf("Lease not aquired after timeout %s", err)
}
id, ok := svc.item[CHECKPOINT_SEQUENCE_NUMBER_KEY]
if !ok {
t.Error("Expected checkpoint to be set by GetLease")
} else if *id.S != "deadbeef" {
t.Errorf("Expected checkpoint to be deadbeef. Got '%s'", *id.S)
}
}
type mockDynamoDB struct {
dynamodbiface.DynamoDBAPI
tableExist bool
item map[string]*dynamodb.AttributeValue
}
func (m *mockDynamoDB) DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.DescribeTableOutput, error) {
if !m.tableExist {
return &dynamodb.DescribeTableOutput{}, awserr.New(dynamodb.ErrCodeResourceNotFoundException, "doesNotExist", errors.New(""))
}
return &dynamodb.DescribeTableOutput{}, nil
}
func (m *mockDynamoDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) {
m.item = input.Item
return nil, nil
}
func (m *mockDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) {
return &dynamodb.GetItemOutput{
Item: m.item,
}, nil
}
func (m *mockDynamoDB) CreateTable(input *dynamodb.CreateTableInput) (*dynamodb.CreateTableOutput, error) {
return &dynamodb.CreateTableOutput{}, nil
}

View file

@ -36,7 +36,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
@ -99,20 +98,8 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli
}
w.kc = kinesis.New(s)
log.Info("Creating DynamoDB session")
s, err = session.NewSession(&aws.Config{
Region: aws.String(w.regionName),
Endpoint: &kclConfig.DynamoDBEndpoint,
Credentials: kclConfig.DynamoDBCredentials,
})
if err != nil {
// no need to move forward
log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err)
}
w.checkpointer = chk.NewDynamoCheckpoint(dynamodb.New(s), kclConfig)
log.Info("Creating DynamoDB based checkpointer")
w.checkpointer = chk.NewDynamoCheckpoint(kclConfig)
if w.metricsConfig == nil {
// "" means noop monitor service. i.e. not emitting any metrics.

32
go.mod
View file

@ -1,25 +1,15 @@
module github.com/vmware/vmware-go-kcl
require (
github.com/asaskevich/govalidator v0.0.0-20170507183629-38ddb4612a5d // indirect
github.com/astaxie/beego v0.0.0-20170908222938-a7354d2d0840 // indirect
github.com/aws/aws-sdk-go v0.0.0-20171208220907-365b4d343694
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-ini/ini v1.42.0 // indirect
github.com/golang/protobuf v0.0.0-20170622202551-6a1fa9404c0a // indirect
github.com/google/uuid v0.0.0-20170306145142-6a5e28554805
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
github.com/matryer/try v0.0.0-20150601225556-312d2599e12e
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v0.0.0-20170707173355-26b897001974
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612 // indirect
github.com/prometheus/common v0.0.0-20170707053319-3e6a7635bac6
github.com/prometheus/procfs v0.0.0-20170703101242-e645f4e5aaa8 // indirect
github.com/sirupsen/logrus v0.0.0-20170713115724-51dc0fc64317
github.com/stretchr/testify v1.2.1
golang.org/x/sys v0.0.0-20190221222158-ec7b60b042fd // indirect
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
gopkg.in/yaml.v2 v2.2.1 // indirect
github.com/aws/aws-sdk-go v1.19.38
github.com/google/uuid v1.1.1
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/prometheus/client_golang v0.9.3
github.com/prometheus/common v0.4.1
github.com/prometheus/procfs v0.0.0-20190523193104-a7aeb8df3389 // indirect
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.3.0
golang.org/x/net v0.0.0-20190522155817-f3200d17e092 // indirect
golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4 // indirect
golang.org/x/text v0.3.2 // indirect
)

101
go.sum
View file

@ -1,39 +1,82 @@
github.com/asaskevich/govalidator v0.0.0-20170507183629-38ddb4612a5d/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/astaxie/beego v0.0.0-20170908222938-a7354d2d0840/go.mod h1:0R4++1tUqERR0WYFWdfkcrsyoVBCG4DgpDGokT3yb+U=
github.com/aws/aws-sdk-go v0.0.0-20171208220907-365b4d343694 h1:TXabFUZYb1oIrmshTCd9k3gLItnCkX8DYNlzC7zT5y4=
github.com/aws/aws-sdk-go v0.0.0-20171208220907-365b4d343694/go.mod h1:ZRmQr0FajVIyZ4ZzBYKG5P3ZqPz9IHG41ZoMu1ADI3k=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a h1:BtpsbiV638WQZwhA98cEZw2BsbnQJrbd0BI7tsy0W1c=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/aws/aws-sdk-go v1.19.38 h1:WKjobgPO4Ua1ww2NJJl2/zQNreUZxvqmEzwMlRjjm9g=
github.com/aws/aws-sdk-go v1.19.38/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-ini/ini v1.42.0 h1:TWr1wGj35+UiWHlBA8er89seFXxzwFn11spilrrj+38=
github.com/go-ini/ini v1.42.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/golang/protobuf v0.0.0-20170622202551-6a1fa9404c0a h1:5X905hYB5maQbwC9ltdknryvCPb4v+D0pWDQXaYQWyk=
github.com/golang/protobuf v0.0.0-20170622202551-6a1fa9404c0a/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v0.0.0-20170306145142-6a5e28554805 h1:skl44gU1qEIcRpwKjb9bhlRwjvr96wLdvpTogCBBJe8=
github.com/google/uuid v0.0.0-20170306145142-6a5e28554805/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/matryer/try v0.0.0-20150601225556-312d2599e12e h1:wEdOROHcWFFOttvgtwOkyRx5AXGm6GKYZV46vUk7RWY=
github.com/matryer/try v0.0.0-20150601225556-312d2599e12e/go.mod h1:0KeJpeMD6o+O4hW7qJOT7vyQPKrWmj26uf5wMc/IiIs=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.0.0-20170707173355-26b897001974 h1:7geXN+A5WSloMJfKwHhigBJCSPW0DZOlypTpzg7Nu40=
github.com/prometheus/client_golang v0.0.0-20170707173355-26b897001974/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612 h1:13pIdM2tpaDi4OVe24fgoIS7ZTqMt0QI+bwQsX5hq+g=
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20170707053319-3e6a7635bac6 h1:UEgo247BhzA25ik6y8sBtRVet8xyPH5+UidPXC+E4t0=
github.com/prometheus/common v0.0.0-20170707053319-3e6a7635bac6/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20170703101242-e645f4e5aaa8 h1:Kh7M6mzRpQ2de1rixoSQZr4BTINXFm8WDbeN5ttnwyE=
github.com/prometheus/procfs v0.0.0-20170703101242-e645f4e5aaa8/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sirupsen/logrus v0.0.0-20170713115724-51dc0fc64317 h1:GpeXjjFK3fgyG/1Dd5felinm3v0XRZKO4cUtMcHGL08=
github.com/sirupsen/logrus v0.0.0-20170713115724-51dc0fc64317/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/stretchr/testify v1.2.1 h1:52QO5WkIUcHGIR7EnGagH88x1bUzqGXTC5/1bDTUQ7U=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sys v0.0.0-20190221222158-ec7b60b042fd h1:JYgmSAJhrvxjUInD1uG+wLPAFAG7TmIJLOgZLI210A8=
golang.org/x/sys v0.0.0-20190221222158-ec7b60b042fd/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3 h1:9iH4JKXLzFbOAdtqv/a+j8aewx2Y8lAjAydhbaScPF8=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.0-20190523193104-a7aeb8df3389 h1:F/k2nob1S9M6v5Xkq7KjSTQirOYaYQord0jR4TwyVmY=
github.com/prometheus/procfs v0.0.0-20190523193104-a7aeb8df3389/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4 h1:gd52YanAQJ4UkvuNi/7z63JEyc6ejHh9QwdzbTiEtAY=
golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View file

@ -1,2 +1,7 @@
#!/bin/bash
./support/scripts/test.sh
# Run only the integration tests
# go test -race ./test
echo "Warning: Cannot find a good way to inject AWS credential to hmake container"
echo "Don't use hmake ci. Use the following command directly"
echo "go test -race ./test"

View file

@ -7,7 +7,7 @@ export PROJ_ROOT="$HMAKE_PROJECT_DIR"
export GOPATH=/go:$PROJ_ROOT
local_go_pkgs() {
find . -name '*.go' | \
find './clientlibrary/' -name '*.go' | \
grep -Fv '/vendor/' | \
grep -Fv '/go/' | \
grep -Fv '/gen/' | \
@ -18,13 +18,6 @@ local_go_pkgs() {
sort -u
}
local_test_pkgs() {
find ./src/test -name '*.go' | \
grep -Fv '_test.go' | \
sed -r 's|(.+)/[^/]+\.go$|\1|g' | \
sort -u
}
version_suffix() {
local suffix=$(git log -1 --format=%h 2>/dev/null || true)
if [ -n "$suffix" ]; then

View file

@ -16,12 +16,9 @@
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package worker
package test
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"os"
"testing"
"time"
@ -32,6 +29,7 @@ import (
chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint"
cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config"
"github.com/vmware/vmware-go-kcl/clientlibrary/utils"
wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker"
)
func TestCustomWorker(t *testing.T) {
@ -53,25 +51,11 @@ func TestCustomWorker(t *testing.T) {
// configure cloudwatch as metrics system
metricsConfig := getMetricsConfig(kclConfig, metricsSystem)
// create dynamodb checkpointer.
s, err := session.NewSession(&aws.Config{
Region: aws.String(regionName),
Endpoint: &kclConfig.DynamoDBEndpoint,
Credentials: kclConfig.DynamoDBCredentials,
})
checkpointer := chk.NewDynamoCheckpoint(kclConfig)
if err != nil {
// no need to move forward
log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err)
}
worker := wk.NewCustomWorker(recordProcessorFactory(t), kclConfig, checkpointer, metricsConfig)
checkpointer := chk.NewDynamoCheckpoint(dynamodb.New(s), kclConfig)
worker := NewCustomWorker(recordProcessorFactory(t), kclConfig, checkpointer, metricsConfig)
assert.Equal(t, regionName, worker.regionName)
assert.Equal(t, streamName, worker.streamName)
err = worker.Start()
err := worker.Start()
assert.Nil(t, err)
// Put some data into stream.

View file

@ -16,7 +16,7 @@
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package worker
package test
import (
"github.com/aws/aws-sdk-go/aws/credentials"
@ -38,6 +38,7 @@ import (
kc "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
"github.com/vmware/vmware-go-kcl/clientlibrary/metrics"
"github.com/vmware/vmware-go-kcl/clientlibrary/utils"
wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker"
)
const (
@ -126,9 +127,7 @@ func runTest(kclConfig *cfg.KinesisClientLibConfiguration, triggersig bool, t *t
// configure cloudwatch as metrics system
metricsConfig := getMetricsConfig(kclConfig, metricsSystem)
worker := NewWorker(recordProcessorFactory(t), kclConfig, metricsConfig)
assert.Equal(t, regionName, worker.regionName)
assert.Equal(t, streamName, worker.streamName)
worker := wk.NewWorker(recordProcessorFactory(t), kclConfig, metricsConfig)
err := worker.Start()
assert.Nil(t, err)