From e2dca428f7968bccf1c5752bd8b9696a30c78eb2 Mon Sep 17 00:00:00 2001 From: kperry Date: Tue, 21 May 2019 17:38:07 -0500 Subject: [PATCH] Added some tests --- consumergroup.go | 4 + consumergroup/ddb.go | 13 +- consumergroup/ddb_test.go | 269 ++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 1 + kinesis_test.go | 37 ++++++ 6 files changed, 323 insertions(+), 3 deletions(-) create mode 100644 consumergroup/ddb_test.go create mode 100644 kinesis_test.go diff --git a/consumergroup.go b/consumergroup.go index f03956c..5c26221 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "errors" "fmt" "sync" "time" @@ -11,6 +12,9 @@ import ( // TODO change logging to actual logger +// StorageCouldNotUpdateOrCreateLease is a simple error for handling races that are lost in storage +var StorageCouldNotUpdateOrCreateLease = errors.New("storage could not update or create lease") + // Lease is data for handling a lease/lock on a particular shard type Lease struct { // LeaseKey is the partition/primaryKey in storage and is the shardID diff --git a/consumergroup/ddb.go b/consumergroup/ddb.go index 593ef96..fdfd59d 100644 --- a/consumergroup/ddb.go +++ b/consumergroup/ddb.go @@ -4,6 +4,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" "github.com/aws/aws-sdk-go/service/dynamodb/expression" @@ -57,7 +58,11 @@ func (dynamoClient DynamoStorage) CreateLease(lease consumer.Lease) error { } if _, err := dynamoClient.Db.PutItem(input); err != nil { - // TODO need to handle ErrCodeConditionalCheckFailedException and repackage error as known error to client + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException { + return consumer.StorageCouldNotUpdateOrCreateLease + } + } return err } @@ -96,7 +101,11 @@ func (dynamoClient DynamoStorage) UpdateLease(originalLease, updatedLease consum } if _, err := dynamoClient.Db.UpdateItem(input); err != nil { - // TODO need to handle ErrCodeConditionalCheckFailedException and repackage error as known error to client + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException { + return consumer.StorageCouldNotUpdateOrCreateLease + } + } return err } diff --git a/consumergroup/ddb_test.go b/consumergroup/ddb_test.go new file mode 100644 index 0000000..f9219f8 --- /dev/null +++ b/consumergroup/ddb_test.go @@ -0,0 +1,269 @@ +package consumergroup + +import ( + "reflect" + "strconv" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/stretchr/testify/mock" + + consumer "github.com/harlow/kinesis-consumer" +) + +var testLease1 = consumer.Lease{ + LeaseKey: "000001", + Checkpoint: "1234345", + LeaseCounter: 0, + LeaseOwner: "1", + HeartbeatID: "12345", + LastUpdateTime: time.Time{}, +} + +type MockDynamo struct { + mock.Mock +} + +func (m *MockDynamo) PutItem(item *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) { + ret := m.Called(item) //get return args + if e := ret.Get(0); e != nil { //get first return args if not nil + return e.(*dynamodb.PutItemOutput), ret.Error(1) + } + return nil, ret.Error(1) +} + +func (m *MockDynamo) UpdateItem(item *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) { + ret := m.Called(item) //get return args + if e := ret.Get(0); e != nil { //get first return args if not nil + return e.(*dynamodb.UpdateItemOutput), ret.Error(1) + } + return nil, ret.Error(1) +} + +func (m *MockDynamo) GetItem(item *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { + ret := m.Called(item) //get return args + if e := ret.Get(0); e != nil { //get first return args if not nil + return e.(*dynamodb.GetItemOutput), ret.Error(1) + } + return nil, ret.Error(1) +} + +func (m *MockDynamo) Scan(item *dynamodb.ScanInput) (*dynamodb.ScanOutput, error) { + ret := m.Called(item) //get return args + if e := ret.Get(0); e != nil { //get first return args if not nil + return e.(*dynamodb.ScanOutput), ret.Error(1) + } + return nil, ret.Error(1) +} + +func TestDynamoStorage_CreateLease(t *testing.T) { + + tests := []struct { + name string + tableName string + lease consumer.Lease + dynamoErr error + expectedErr error + }{ + { + name: "CreateLease_ExpectSuccess", + tableName: "test", + lease: testLease1, + dynamoErr: nil, + expectedErr: nil, + }, + { + name: "CreateLease_Expect_Error_StorageCouldNotUpdateOrCreateLease", + tableName: "test", + lease: testLease1, + dynamoErr: awserr.New(dynamodb.ErrCodeConditionalCheckFailedException, "", nil), + expectedErr: consumer.StorageCouldNotUpdateOrCreateLease, + }, + { + name: "CreateLease_Expect_Error_ErrCodeInternalServerError", + tableName: "test", + lease: testLease1, + dynamoErr: awserr.New(dynamodb.ErrCodeInternalServerError, "", nil), + expectedErr: awserr.New(dynamodb.ErrCodeInternalServerError, "", nil), + }, + } + for _, tt := range tests { + mockDynamo := MockDynamo{} + mockDynamo.On("PutItem", mock.Anything).Return(nil, tt.dynamoErr) + t.Run(tt.name, func(t *testing.T) { + dynamoClient := DynamoStorage{ + Db: &mockDynamo, + tableName: tt.tableName, + } + err := dynamoClient.CreateLease(tt.lease) + if err != nil { + if err.Error() != tt.expectedErr.Error() { + t.Errorf("DynamoStorage.CreateLease() error = %v, expectedErr %v,", err, tt.expectedErr) + } + } + }) + } +} + +func TestDynamoStorage_UpdateLease(t *testing.T) { + type fields struct { + Db DynamoDb + tableName string + } + type args struct { + originalLease consumer.Lease + updatedLease consumer.Lease + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockDynamo := MockDynamo{} + mockDynamo.On("PutItem", mock.Anything).Return(nil, nil) + dynamoClient := DynamoStorage{ + Db: tt.fields.Db, + tableName: tt.fields.tableName, + } + if err := dynamoClient.UpdateLease(tt.args.originalLease, tt.args.updatedLease); (err != nil) != tt.wantErr { + t.Errorf("DynamoStorage.UpdateLease() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_mapLeaseToLeaseUpdate(t *testing.T) { + type args struct { + lease consumer.Lease + } + tests := []struct { + name string + args args + want LeaseUpdate + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := mapLeaseToLeaseUpdate(tt.args.lease); !reflect.DeepEqual(got, tt.want) { + t.Errorf("mapLeaseToLeaseUpdate() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDynamoStorage_GetLease(t *testing.T) { + stringCounter := strconv.Itoa(testLease1.LeaseCounter) + dynamoOutput := dynamodb.GetItemOutput{ + ConsumedCapacity: nil, + Item: map[string]*dynamodb.AttributeValue{ + "leaseKey": {S: &testLease1.LeaseKey}, + "checkpoint": {S: &testLease1.Checkpoint}, + "leaseCounter": {N: &stringCounter}, + "leaseOwner": {S: &testLease1.LeaseOwner}, + "heartbeatID": {S: &testLease1.HeartbeatID}, + }, + } + /*LeaseKey: "000001", + Checkpoint: "1234345", + LeaseCounter: 0, + LeaseOwner: "1", + HeartbeatID: "12345", + + */ + tests := []struct { + name string + tableName string + leaseKey string + want *consumer.Lease + dynamoOut *dynamodb.GetItemOutput + dynamoErr error + wantErr bool + }{ + { + name: "GetLease_Expect_Success", + tableName: "", + leaseKey: testLease1.LeaseKey, + want: &testLease1, + dynamoOut: &dynamoOutput, + dynamoErr: nil, + wantErr: false, + }, + } + for _, tt := range tests { + mockDynamo := MockDynamo{} + mockDynamo.On("GetItem", mock.Anything).Return(tt.dynamoOut, tt.dynamoErr) + t.Run(tt.name, func(t *testing.T) { + dynamoClient := DynamoStorage{ + Db: &mockDynamo, + tableName: tt.tableName, + } + got, err := dynamoClient.GetLease(tt.leaseKey) + if (err != nil) != tt.wantErr { + t.Errorf("DynamoStorage.GetLease() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("DynamoStorage.GetLease() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_mapLeaseKeyToDdbKey(t *testing.T) { + type args struct { + leaseKey string + } + tests := []struct { + name string + args args + want map[string]*dynamodb.AttributeValue + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := mapLeaseKeyToDdbKey(tt.args.leaseKey); !reflect.DeepEqual(got, tt.want) { + t.Errorf("mapLeaseKeyToDdbKey() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDynamoStorage_GetAllLeases(t *testing.T) { + type fields struct { + Db DynamoDb + tableName string + } + tests := []struct { + name string + fields fields + want map[string]consumer.Lease + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dynamoClient := DynamoStorage{ + Db: tt.fields.Db, + tableName: tt.fields.tableName, + } + got, err := dynamoClient.GetAllLeases() + if (err != nil) != tt.wantErr { + t.Errorf("DynamoStorage.GetAllLeases() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("DynamoStorage.GetAllLeases() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/go.mod b/go.mod index 62ff507..5c0be0f 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/onsi/gomega v1.5.0 // indirect github.com/pkg/errors v0.8.0 github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect - github.com/stretchr/testify v1.3.0 // indirect + github.com/stretchr/testify v1.3.0 github.com/twinj/uuid v1.0.0 golang.org/x/net v0.0.0-20190514140710-3ec191127204 // indirect google.golang.org/appengine v1.6.0 // indirect diff --git a/go.sum b/go.sum index 4a974f8..15f4250 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,7 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/kinesis_test.go b/kinesis_test.go new file mode 100644 index 0000000..b8fddbe --- /dev/null +++ b/kinesis_test.go @@ -0,0 +1,37 @@ +package consumer + +import ( + "reflect" + "testing" +) + +func TestKinesis_ListAllShards(t *testing.T) { + type fields struct { + client KinesisClient + streamName string + } + tests := []struct { + name string + fields fields + want []string + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + k := Kinesis{ + client: tt.fields.client, + streamName: tt.fields.streamName, + } + got, err := k.ListAllShards() + if (err != nil) != tt.wantErr { + t.Errorf("Kinesis.ListAllShards() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Kinesis.ListAllShards() = %v, want %v", got, tt.want) + } + }) + } +}