From 97599ea83f0d180d5960add9d606366332f1e3b8 Mon Sep 17 00:00:00 2001 From: komealy Date: Thu, 20 Apr 2023 07:50:57 -0700 Subject: [PATCH] Info level is too chatty for this sleep message, debug is good Signed-off-by: Kris O'Mealy --- .../worker/polling-shard-consumer.go | 23 ++++++++++--------- .../worker/polling-shard-consumer_test.go | 8 +++---- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 3829850..41321a3 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -32,10 +32,11 @@ package worker import ( "context" "errors" - log "github.com/sirupsen/logrus" "math" "time" + log "github.com/sirupsen/logrus" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" @@ -53,10 +54,10 @@ const ( ) var ( - rateLimitTimeNow = time.Now - rateLimitTimeSince = time.Since - localTPSExceededError = errors.New("Error GetRecords TPS Exceeded") - maxBytesExceededError = errors.New("Error GetRecords Max Bytes For Call Period Exceeded") + rateLimitTimeNow = time.Now + rateLimitTimeSince = time.Since + errLocalTPSExceeded = errors.New("error GetRecords TPS exceeded") + errMaxBytesExceeded = errors.New("error GetRecords max bytes for call period exceeded") ) // PollingShardConsumer is responsible for polling data records from a (specified) shard. @@ -175,13 +176,13 @@ func (sc *PollingShardConsumer) getRecords() error { sc.waitASecond(sc.currTime) continue } - if err == localTPSExceededError { - log.Infof("localTPSExceededError so sleep for a second") + if err == errLocalTPSExceeded { + log.Debugf("localTPSExceededError so sleep for a second") sc.waitASecond(sc.currTime) continue } - if err == maxBytesExceededError { - log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod) + if err == errMaxBytesExceeded { + log.Debugf("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod) time.Sleep(time.Duration(coolDownPeriod) * time.Second) continue } @@ -264,7 +265,7 @@ func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) { if sc.bytesRead%MaxBytesPerSecond > 0 { coolDown++ } - return coolDown, maxBytesExceededError + return coolDown, errMaxBytesExceeded } else { sc.remBytes -= sc.bytesRead } @@ -285,7 +286,7 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) } if sc.callsLeft < 1 { - return nil, 0, localTPSExceededError + return nil, 0, errLocalTPSExceeded } getResp, err := sc.kc.GetRecords(context.TODO(), gri) sc.callsLeft-- diff --git a/clientlibrary/worker/polling-shard-consumer_test.go b/clientlibrary/worker/polling-shard-consumer_test.go index 736b2bd..1b26ebb 100644 --- a/clientlibrary/worker/polling-shard-consumer_test.go +++ b/clientlibrary/worker/polling-shard-consumer_test.go @@ -32,7 +32,7 @@ import ( ) var ( - testGetRecordsError = errors.New("GetRecords Error") + errTestGetRecords = errors.New("GetRecords error") ) func TestCallGetRecordsAPI(t *testing.T) { @@ -62,7 +62,7 @@ func TestCallGetRecordsAPI(t *testing.T) { } out2, _, err2 := psc2.callGetRecordsAPI(&gri) assert.Nil(t, out2) - assert.ErrorIs(t, err2, localTPSExceededError) + assert.ErrorIs(t, err2, errLocalTPSExceeded) m2.AssertExpectations(t) // check that getRecords is called normally in bytesRead = 0 case @@ -162,7 +162,7 @@ func TestCallGetRecordsAPI(t *testing.T) { // case where getRecords throws error m7 := MockKinesisSubscriberGetter{} ret7 := kinesis.GetRecordsOutput{Records: nil} - m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, testGetRecordsError) + m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, errTestGetRecords) psc7 := PollingShardConsumer{ commonShardConsumer: commonShardConsumer{kc: &m7}, callsLeft: 2, @@ -172,7 +172,7 @@ func TestCallGetRecordsAPI(t *testing.T) { return 2 * time.Second } out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri) - assert.Equal(t, err7, testGetRecordsError) + assert.Equal(t, err7, errTestGetRecords) assert.Equal(t, checkSleepVal7, 0) assert.Equal(t, out7, &ret7) m7.AssertExpectations(t)