Info level is too chatty for this sleep message, debug is good
Signed-off-by: Kris O'Mealy <komealy@hotmail.com>
This commit is contained in:
parent
b12921da23
commit
97599ea83f
2 changed files with 16 additions and 15 deletions
|
|
@ -32,10 +32,11 @@ package worker
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"math"
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||||
|
|
@ -55,8 +56,8 @@ const (
|
||||||
var (
|
var (
|
||||||
rateLimitTimeNow = time.Now
|
rateLimitTimeNow = time.Now
|
||||||
rateLimitTimeSince = time.Since
|
rateLimitTimeSince = time.Since
|
||||||
localTPSExceededError = errors.New("Error GetRecords TPS Exceeded")
|
errLocalTPSExceeded = errors.New("error GetRecords TPS exceeded")
|
||||||
maxBytesExceededError = errors.New("Error GetRecords Max Bytes For Call Period Exceeded")
|
errMaxBytesExceeded = errors.New("error GetRecords max bytes for call period exceeded")
|
||||||
)
|
)
|
||||||
|
|
||||||
// PollingShardConsumer is responsible for polling data records from a (specified) shard.
|
// PollingShardConsumer is responsible for polling data records from a (specified) shard.
|
||||||
|
|
@ -175,13 +176,13 @@ func (sc *PollingShardConsumer) getRecords() error {
|
||||||
sc.waitASecond(sc.currTime)
|
sc.waitASecond(sc.currTime)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err == localTPSExceededError {
|
if err == errLocalTPSExceeded {
|
||||||
log.Infof("localTPSExceededError so sleep for a second")
|
log.Debugf("localTPSExceededError so sleep for a second")
|
||||||
sc.waitASecond(sc.currTime)
|
sc.waitASecond(sc.currTime)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err == maxBytesExceededError {
|
if err == errMaxBytesExceeded {
|
||||||
log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod)
|
log.Debugf("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod)
|
||||||
time.Sleep(time.Duration(coolDownPeriod) * time.Second)
|
time.Sleep(time.Duration(coolDownPeriod) * time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -264,7 +265,7 @@ func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) {
|
||||||
if sc.bytesRead%MaxBytesPerSecond > 0 {
|
if sc.bytesRead%MaxBytesPerSecond > 0 {
|
||||||
coolDown++
|
coolDown++
|
||||||
}
|
}
|
||||||
return coolDown, maxBytesExceededError
|
return coolDown, errMaxBytesExceeded
|
||||||
} else {
|
} else {
|
||||||
sc.remBytes -= sc.bytesRead
|
sc.remBytes -= sc.bytesRead
|
||||||
}
|
}
|
||||||
|
|
@ -285,7 +286,7 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput)
|
||||||
}
|
}
|
||||||
|
|
||||||
if sc.callsLeft < 1 {
|
if sc.callsLeft < 1 {
|
||||||
return nil, 0, localTPSExceededError
|
return nil, 0, errLocalTPSExceeded
|
||||||
}
|
}
|
||||||
getResp, err := sc.kc.GetRecords(context.TODO(), gri)
|
getResp, err := sc.kc.GetRecords(context.TODO(), gri)
|
||||||
sc.callsLeft--
|
sc.callsLeft--
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
testGetRecordsError = errors.New("GetRecords Error")
|
errTestGetRecords = errors.New("GetRecords error")
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCallGetRecordsAPI(t *testing.T) {
|
func TestCallGetRecordsAPI(t *testing.T) {
|
||||||
|
|
@ -62,7 +62,7 @@ func TestCallGetRecordsAPI(t *testing.T) {
|
||||||
}
|
}
|
||||||
out2, _, err2 := psc2.callGetRecordsAPI(&gri)
|
out2, _, err2 := psc2.callGetRecordsAPI(&gri)
|
||||||
assert.Nil(t, out2)
|
assert.Nil(t, out2)
|
||||||
assert.ErrorIs(t, err2, localTPSExceededError)
|
assert.ErrorIs(t, err2, errLocalTPSExceeded)
|
||||||
m2.AssertExpectations(t)
|
m2.AssertExpectations(t)
|
||||||
|
|
||||||
// check that getRecords is called normally in bytesRead = 0 case
|
// check that getRecords is called normally in bytesRead = 0 case
|
||||||
|
|
@ -162,7 +162,7 @@ func TestCallGetRecordsAPI(t *testing.T) {
|
||||||
// case where getRecords throws error
|
// case where getRecords throws error
|
||||||
m7 := MockKinesisSubscriberGetter{}
|
m7 := MockKinesisSubscriberGetter{}
|
||||||
ret7 := kinesis.GetRecordsOutput{Records: nil}
|
ret7 := kinesis.GetRecordsOutput{Records: nil}
|
||||||
m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, testGetRecordsError)
|
m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, errTestGetRecords)
|
||||||
psc7 := PollingShardConsumer{
|
psc7 := PollingShardConsumer{
|
||||||
commonShardConsumer: commonShardConsumer{kc: &m7},
|
commonShardConsumer: commonShardConsumer{kc: &m7},
|
||||||
callsLeft: 2,
|
callsLeft: 2,
|
||||||
|
|
@ -172,7 +172,7 @@ func TestCallGetRecordsAPI(t *testing.T) {
|
||||||
return 2 * time.Second
|
return 2 * time.Second
|
||||||
}
|
}
|
||||||
out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri)
|
out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri)
|
||||||
assert.Equal(t, err7, testGetRecordsError)
|
assert.Equal(t, err7, errTestGetRecords)
|
||||||
assert.Equal(t, checkSleepVal7, 0)
|
assert.Equal(t, checkSleepVal7, 0)
|
||||||
assert.Equal(t, out7, &ret7)
|
assert.Equal(t, out7, &ret7)
|
||||||
m7.AssertExpectations(t)
|
m7.AssertExpectations(t)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue