This commit is contained in:
Kris O'Mealy 2023-04-22 15:59:51 +00:00 committed by GitHub
commit d558d77cb4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 133 additions and 15 deletions

View file

@ -71,6 +71,10 @@ type cloudWatchMetrics struct {
leaseRenewals int64 leaseRenewals int64
getRecordsTime []float64 getRecordsTime []float64
processRecordsTime []float64 processRecordsTime []float64
localTPSBackoffs int64
maxBytesBackoffs int64
throttlingBackoffs int64
getRecordsErrors int64
} }
// NewMonitoringService returns a Monitoring service publishing metrics to CloudWatch. // NewMonitoringService returns a Monitoring service publishing metrics to CloudWatch.
@ -201,6 +205,34 @@ func (cw *MonitoringService) flushShard(shard string, metric *cloudWatchMetrics)
Timestamp: &metricTimestamp, Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.leasesHeld)), Value: aws.Float64(float64(metric.leasesHeld)),
}, },
{
Dimensions: defaultDimensions,
MetricName: aws.String("LocalTPSBackoffs"),
Unit: types.StandardUnitCount,
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.localTPSBackoffs)),
},
{
Dimensions: defaultDimensions,
MetricName: aws.String("MaxBytesBackoffs"),
Unit: types.StandardUnitCount,
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.maxBytesBackoffs)),
},
{
Dimensions: defaultDimensions,
MetricName: aws.String("ThrottlingBackoffs"),
Unit: types.StandardUnitCount,
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.throttlingBackoffs)),
},
{
Dimensions: defaultDimensions,
MetricName: aws.String("GetRecordsErrors"),
Unit: types.StandardUnitCount,
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.getRecordsErrors)),
},
} }
if len(metric.behindLatestMillis) > 0 { if len(metric.behindLatestMillis) > 0 {
@ -258,6 +290,10 @@ func (cw *MonitoringService) flushShard(shard string, metric *cloudWatchMetrics)
metric.leaseRenewals = 0 metric.leaseRenewals = 0
metric.getRecordsTime = []float64{} metric.getRecordsTime = []float64{}
metric.processRecordsTime = []float64{} metric.processRecordsTime = []float64{}
metric.localTPSBackoffs = 0
metric.maxBytesBackoffs = 0
metric.throttlingBackoffs = 0
metric.getRecordsErrors = 0
} else { } else {
cw.logger.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) cw.logger.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err)
} }
@ -325,6 +361,7 @@ func (cw *MonitoringService) RecordGetRecordsTime(shard string, time float64) {
defer m.Unlock() defer m.Unlock()
m.getRecordsTime = append(m.getRecordsTime, time) m.getRecordsTime = append(m.getRecordsTime, time)
} }
func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64) { func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64) {
m := cw.getOrCreatePerShardMetrics(shard) m := cw.getOrCreatePerShardMetrics(shard)
m.Lock() m.Lock()
@ -332,6 +369,34 @@ func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64
m.processRecordsTime = append(m.processRecordsTime, time) m.processRecordsTime = append(m.processRecordsTime, time)
} }
func (cw *MonitoringService) IncrLocalTPSBackoffs(shard string, count int) {
m := cw.getOrCreatePerShardMetrics(shard)
m.Lock()
defer m.Unlock()
m.localTPSBackoffs += int64(count)
}
func (cw *MonitoringService) IncrMaxBytesBackoffs(shard string, count int) {
m := cw.getOrCreatePerShardMetrics(shard)
m.Lock()
defer m.Unlock()
m.maxBytesBackoffs += int64(count)
}
func (cw *MonitoringService) IncrThrottlingBackoffs(shard string, count int) {
m := cw.getOrCreatePerShardMetrics(shard)
m.Lock()
defer m.Unlock()
m.throttlingBackoffs += int64(count)
}
func (cw *MonitoringService) IncrGetRecordsErrors(shard string, count int) {
m := cw.getOrCreatePerShardMetrics(shard)
m.Lock()
defer m.Unlock()
m.getRecordsErrors += int64(count)
}
func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics { func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics {
var i interface{} var i interface{}
var ok bool var ok bool

View file

@ -41,6 +41,10 @@ type MonitoringService interface {
LeaseRenewed(shard string) LeaseRenewed(shard string)
RecordGetRecordsTime(shard string, time float64) RecordGetRecordsTime(shard string, time float64)
RecordProcessRecordsTime(shard string, time float64) RecordProcessRecordsTime(shard string, time float64)
IncrLocalTPSBackoffs(shard string, count int)
IncrMaxBytesBackoffs(shard string, count int)
IncrThrottlingBackoffs(shard string, count int)
IncrGetRecordsErrors(shard string, count int)
Shutdown() Shutdown()
} }
@ -60,3 +64,7 @@ func (NoopMonitoringService) LeaseLost(_ string) {}
func (NoopMonitoringService) LeaseRenewed(_ string) {} func (NoopMonitoringService) LeaseRenewed(_ string) {}
func (NoopMonitoringService) RecordGetRecordsTime(_ string, _ float64) {} func (NoopMonitoringService) RecordGetRecordsTime(_ string, _ float64) {}
func (NoopMonitoringService) RecordProcessRecordsTime(_ string, _ float64) {} func (NoopMonitoringService) RecordProcessRecordsTime(_ string, _ float64) {}
func (NoopMonitoringService) IncrLocalTPSBackoffs(_ string, _ int) {}
func (NoopMonitoringService) IncrMaxBytesBackoffs(_ string, _ int) {}
func (NoopMonitoringService) IncrThrottlingBackoffs(_ string, _ int) {}
func (NoopMonitoringService) IncrGetRecordsErrors(_ string, _ int) {}

View file

@ -55,6 +55,10 @@ type MonitoringService struct {
leaseRenewals *prom.CounterVec leaseRenewals *prom.CounterVec
getRecordsTime *prom.HistogramVec getRecordsTime *prom.HistogramVec
processRecordsTime *prom.HistogramVec processRecordsTime *prom.HistogramVec
localTPSBackoffs *prom.CounterVec
maxBytesBackoffs *prom.CounterVec
throttlingBackoffs *prom.CounterVec
getRecordsErrors *prom.CounterVec
} }
// NewMonitoringService returns a Monitoring service publishing metrics to Prometheus. // NewMonitoringService returns a Monitoring service publishing metrics to Prometheus.
@ -99,6 +103,22 @@ func (p *MonitoringService) Init(appName, streamName, workerID string) error {
Name: p.namespace + `_process_records_duration_milliseconds`, Name: p.namespace + `_process_records_duration_milliseconds`,
Help: "The time taken to process records", Help: "The time taken to process records",
}, []string{"kinesisStream", "shard"}) }, []string{"kinesisStream", "shard"})
p.localTPSBackoffs = prom.NewCounterVec(prom.CounterOpts{
Name: p.namespace + `_local_tps_backoffs`,
Help: "The number of times backoffs happen from exceeding kinesis read TPS limit",
}, []string{"kinesisStream", "shard"})
p.maxBytesBackoffs = prom.NewCounterVec(prom.CounterOpts{
Name: p.namespace + `_max_bytes_backoffs`,
Help: "The number of times backoffs happen from exceeding kinesis max bytes read limit",
}, []string{"kinesisStream", "shard"})
p.throttlingBackoffs = prom.NewCounterVec(prom.CounterOpts{
Name: p.namespace + `_throttling_backoffs`,
Help: "The number of times backoffs happen from encountering kinesis read throttling",
}, []string{"kinesisStream", "shard"})
p.getRecordsErrors = prom.NewCounterVec(prom.CounterOpts{
Name: p.namespace + `_get_records_errors`,
Help: "The number of times non-retryable errors happen from GetRecords",
}, []string{"kinesisStream", "shard"})
metrics := []prom.Collector{ metrics := []prom.Collector{
p.processedBytes, p.processedBytes,
@ -108,6 +128,10 @@ func (p *MonitoringService) Init(appName, streamName, workerID string) error {
p.leaseRenewals, p.leaseRenewals,
p.getRecordsTime, p.getRecordsTime,
p.processRecordsTime, p.processRecordsTime,
p.localTPSBackoffs,
p.maxBytesBackoffs,
p.throttlingBackoffs,
p.getRecordsErrors,
} }
for _, metric := range metrics { for _, metric := range metrics {
err := prom.Register(metric) err := prom.Register(metric)
@ -170,3 +194,19 @@ func (p *MonitoringService) RecordGetRecordsTime(shard string, time float64) {
func (p *MonitoringService) RecordProcessRecordsTime(shard string, time float64) { func (p *MonitoringService) RecordProcessRecordsTime(shard string, time float64) {
p.processRecordsTime.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Observe(time) p.processRecordsTime.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Observe(time)
} }
func (p *MonitoringService) IncrLocalTPSBackoffs(shard string, count int) {
p.localTPSBackoffs.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Add(float64(count))
}
func (p *MonitoringService) IncrMaxBytesBackoffs(shard string, count int) {
p.maxBytesBackoffs.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Add(float64(count))
}
func (p *MonitoringService) IncrThrottlingBackoffs(shard string, count int) {
p.throttlingBackoffs.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Add(float64(count))
}
func (p *MonitoringService) IncrGetRecordsErrors(shard string, count int) {
p.getRecordsErrors.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Add(float64(count))
}

View file

@ -32,10 +32,11 @@ package worker
import ( import (
"context" "context"
"errors" "errors"
log "github.com/sirupsen/logrus"
"math" "math"
"time" "time"
log "github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
@ -53,10 +54,10 @@ const (
) )
var ( var (
rateLimitTimeNow = time.Now rateLimitTimeNow = time.Now
rateLimitTimeSince = time.Since rateLimitTimeSince = time.Since
localTPSExceededError = errors.New("Error GetRecords TPS Exceeded") errLocalTPSExceeded = errors.New("error GetRecords TPS exceeded")
maxBytesExceededError = errors.New("Error GetRecords Max Bytes For Call Period Exceeded") errMaxBytesExceeded = errors.New("error GetRecords max bytes for call period exceeded")
) )
// PollingShardConsumer is responsible for polling data records from a (specified) shard. // PollingShardConsumer is responsible for polling data records from a (specified) shard.
@ -175,13 +176,15 @@ func (sc *PollingShardConsumer) getRecords() error {
sc.waitASecond(sc.currTime) sc.waitASecond(sc.currTime)
continue continue
} }
if err == localTPSExceededError { if err == errLocalTPSExceeded {
log.Infof("localTPSExceededError so sleep for a second") log.Debugf("localTPSExceededError so sleep for a second")
sc.mService.IncrLocalTPSBackoffs(sc.shard.ID, 1)
sc.waitASecond(sc.currTime) sc.waitASecond(sc.currTime)
continue continue
} }
if err == maxBytesExceededError { if err == errMaxBytesExceeded {
log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod) log.Debugf("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod)
sc.mService.IncrMaxBytesBackoffs(sc.shard.ID, 1)
time.Sleep(time.Duration(coolDownPeriod) * time.Second) time.Sleep(time.Duration(coolDownPeriod) * time.Second)
continue continue
} }
@ -199,9 +202,11 @@ func (sc *PollingShardConsumer) getRecords() error {
} }
// exponential backoff // exponential backoff
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
sc.mService.IncrThrottlingBackoffs(sc.shard.ID, 1)
time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond) time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond)
continue continue
} }
sc.mService.IncrGetRecordsErrors(sc.shard.ID, 1)
log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs) log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs)
return err return err
} }
@ -264,7 +269,7 @@ func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) {
if sc.bytesRead%MaxBytesPerSecond > 0 { if sc.bytesRead%MaxBytesPerSecond > 0 {
coolDown++ coolDown++
} }
return coolDown, maxBytesExceededError return coolDown, errMaxBytesExceeded
} else { } else {
sc.remBytes -= sc.bytesRead sc.remBytes -= sc.bytesRead
} }
@ -285,7 +290,7 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput)
} }
if sc.callsLeft < 1 { if sc.callsLeft < 1 {
return nil, 0, localTPSExceededError return nil, 0, errLocalTPSExceeded
} }
getResp, err := sc.kc.GetRecords(context.TODO(), gri) getResp, err := sc.kc.GetRecords(context.TODO(), gri)
sc.callsLeft-- sc.callsLeft--

View file

@ -32,7 +32,7 @@ import (
) )
var ( var (
testGetRecordsError = errors.New("GetRecords Error") errTestGetRecords = errors.New("GetRecords error")
) )
func TestCallGetRecordsAPI(t *testing.T) { func TestCallGetRecordsAPI(t *testing.T) {
@ -62,7 +62,7 @@ func TestCallGetRecordsAPI(t *testing.T) {
} }
out2, _, err2 := psc2.callGetRecordsAPI(&gri) out2, _, err2 := psc2.callGetRecordsAPI(&gri)
assert.Nil(t, out2) assert.Nil(t, out2)
assert.ErrorIs(t, err2, localTPSExceededError) assert.ErrorIs(t, err2, errLocalTPSExceeded)
m2.AssertExpectations(t) m2.AssertExpectations(t)
// check that getRecords is called normally in bytesRead = 0 case // check that getRecords is called normally in bytesRead = 0 case
@ -162,7 +162,7 @@ func TestCallGetRecordsAPI(t *testing.T) {
// case where getRecords throws error // case where getRecords throws error
m7 := MockKinesisSubscriberGetter{} m7 := MockKinesisSubscriberGetter{}
ret7 := kinesis.GetRecordsOutput{Records: nil} ret7 := kinesis.GetRecordsOutput{Records: nil}
m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, testGetRecordsError) m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, errTestGetRecords)
psc7 := PollingShardConsumer{ psc7 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m7}, commonShardConsumer: commonShardConsumer{kc: &m7},
callsLeft: 2, callsLeft: 2,
@ -172,7 +172,7 @@ func TestCallGetRecordsAPI(t *testing.T) {
return 2 * time.Second return 2 * time.Second
} }
out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri) out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri)
assert.Equal(t, err7, testGetRecordsError) assert.Equal(t, err7, errTestGetRecords)
assert.Equal(t, checkSleepVal7, 0) assert.Equal(t, checkSleepVal7, 0)
assert.Equal(t, out7, &ret7) assert.Equal(t, out7, &ret7)
m7.AssertExpectations(t) m7.AssertExpectations(t)