From 3be57e8a745524d5a0ab463952980c69cd78b303 Mon Sep 17 00:00:00 2001 From: John Calixto Date: Mon, 23 Jan 2023 17:32:27 -0800 Subject: [PATCH] Refactor in prep for testing rate limiting improvements Signed-off-by: John Calixto --- clientlibrary/worker/common-shard-consumer.go | 9 ++- .../worker/polling-shard-consumer.go | 11 +++- .../worker/polling-shard-consumer_test.go | 65 +++++++++++++++++++ go.mod | 5 +- go.sum | 11 +++- 5 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 clientlibrary/worker/polling-shard-consumer_test.go diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index 253cecb..1b49f6a 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -21,6 +21,7 @@ package worker import ( + "context" "sync" "time" @@ -40,10 +41,16 @@ type shardConsumer interface { getRecords() error } +type KinesisSubscriberGetter interface { + SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) + GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) + GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) +} + // commonShardConsumer implements common functionality for regular and enhanced fan-out consumers type commonShardConsumer struct { shard *par.ShardStatus - kc *kinesis.Client + kc KinesisSubscriberGetter checkpointer chk.Checkpointer recordProcessor kcl.IRecordProcessor kclConfig *config.KinesisClientLibConfiguration diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index a20fde0..cd4565a 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -129,13 +129,13 @@ func (sc *PollingShardConsumer) getRecords() error { getRecordsStartTime := time.Now() log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) + + // Get records from stream and retry as needed getRecordsArgs := &kinesis.GetRecordsInput{ Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)), ShardIterator: shardIterator, } - - // Get records from stream and retry as needed - getResp, err := sc.kc.GetRecords(context.TODO(), getRecordsArgs) + getResp, err := sc.callGetRecordsAPI(getRecordsArgs) if err != nil { //aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling var throughputExceededErr *types.ProvisionedThroughputExceededException @@ -181,3 +181,8 @@ func (sc *PollingShardConsumer) getRecords() error { } } } + +func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) { + getResp, err := sc.kc.GetRecords(context.TODO(), gri) + return getResp, err +} diff --git a/clientlibrary/worker/polling-shard-consumer_test.go b/clientlibrary/worker/polling-shard-consumer_test.go new file mode 100644 index 0000000..b6a1fcf --- /dev/null +++ b/clientlibrary/worker/polling-shard-consumer_test.go @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package worker + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestCallGetRecordsAPI(t *testing.T) { + // basic happy path + m1 := MockKinesisSubscriberGetter{} + ret := kinesis.GetRecordsOutput{} + m1.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret, nil) + psc := PollingShardConsumer{ + commonShardConsumer: commonShardConsumer{kc: &m1}, + } + gri := kinesis.GetRecordsInput{ + ShardIterator: aws.String("shard-iterator-01"), + } + out, err := psc.callGetRecordsAPI(&gri) + assert.Nil(t, err) + assert.Equal(t, &ret, out) + m1.AssertExpectations(t) +} + +type MockKinesisSubscriberGetter struct { + mock.Mock +} + +func (m *MockKinesisSubscriberGetter) GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) { + ret := m.Called(ctx, params, optFns) + + return ret.Get(0).(*kinesis.GetRecordsOutput), ret.Error(1) +} + +func (m *MockKinesisSubscriberGetter) GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) { + return nil, nil +} + +func (m *MockKinesisSubscriberGetter) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) { + return nil, nil +} diff --git a/go.mod b/go.mod index 9a56ba8..3d9554c 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/prometheus/common v0.32.1 github.com/rs/zerolog v1.26.1 github.com/sirupsen/logrus v1.8.1 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.8.1 go.uber.org/zap v1.20.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -42,9 +42,10 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/stretchr/objx v0.5.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f03f679..159c486 100644 --- a/go.sum +++ b/go.sum @@ -240,11 +240,17 @@ github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -543,8 +549,9 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=