diff --git a/clientlibrary/metrics/cloudwatch/cloudwatch.go b/clientlibrary/metrics/cloudwatch/cloudwatch.go index 7fb66b3..1a22f24 100644 --- a/clientlibrary/metrics/cloudwatch/cloudwatch.go +++ b/clientlibrary/metrics/cloudwatch/cloudwatch.go @@ -71,6 +71,10 @@ type cloudWatchMetrics struct { leaseRenewals int64 getRecordsTime []float64 processRecordsTime []float64 + localTPSBackoffs int64 + maxBytesBackoffs int64 + throttlingBackoffs int64 + getRecordsErrors int64 } // NewMonitoringService returns a Monitoring service publishing metrics to CloudWatch. @@ -201,6 +205,34 @@ func (cw *MonitoringService) flushShard(shard string, metric *cloudWatchMetrics) Timestamp: &metricTimestamp, Value: aws.Float64(float64(metric.leasesHeld)), }, + { + Dimensions: defaultDimensions, + MetricName: aws.String("LocalTPSBackoffs"), + Unit: types.StandardUnitCount, + Timestamp: &metricTimestamp, + Value: aws.Float64(float64(metric.localTPSBackoffs)), + }, + { + Dimensions: defaultDimensions, + MetricName: aws.String("MaxBytesBackoffs"), + Unit: types.StandardUnitCount, + Timestamp: &metricTimestamp, + Value: aws.Float64(float64(metric.maxBytesBackoffs)), + }, + { + Dimensions: defaultDimensions, + MetricName: aws.String("ThrottlingBackoffs"), + Unit: types.StandardUnitCount, + Timestamp: &metricTimestamp, + Value: aws.Float64(float64(metric.throttlingBackoffs)), + }, + { + Dimensions: defaultDimensions, + MetricName: aws.String("GetRecordsErrors"), + Unit: types.StandardUnitCount, + Timestamp: &metricTimestamp, + Value: aws.Float64(float64(metric.getRecordsErrors)), + }, } if len(metric.behindLatestMillis) > 0 { @@ -258,6 +290,10 @@ func (cw *MonitoringService) flushShard(shard string, metric *cloudWatchMetrics) metric.leaseRenewals = 0 metric.getRecordsTime = []float64{} metric.processRecordsTime = []float64{} + metric.localTPSBackoffs = 0 + metric.maxBytesBackoffs = 0 + metric.throttlingBackoffs = 0 + metric.getRecordsErrors = 0 } else { cw.logger.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) } @@ -325,6 +361,7 @@ func (cw *MonitoringService) RecordGetRecordsTime(shard string, time float64) { defer m.Unlock() m.getRecordsTime = append(m.getRecordsTime, time) } + func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64) { m := cw.getOrCreatePerShardMetrics(shard) m.Lock() @@ -332,6 +369,34 @@ func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64 m.processRecordsTime = append(m.processRecordsTime, time) } +func (cw *MonitoringService) IncrLocalTPSBackoffs(shard string, count int) { + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.localTPSBackoffs += int64(count) +} + +func (cw *MonitoringService) IncrMaxBytesBackoffs(shard string, count int) { + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.maxBytesBackoffs += int64(count) +} + +func (cw *MonitoringService) IncrThrottlingBackoffs(shard string, count int) { + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.throttlingBackoffs += int64(count) +} + +func (cw *MonitoringService) IncrGetRecordsErrors(shard string, count int) { + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.getRecordsErrors += int64(count) +} + func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics { var i interface{} var ok bool diff --git a/clientlibrary/metrics/interfaces.go b/clientlibrary/metrics/interfaces.go index 47ec490..c89f1e8 100644 --- a/clientlibrary/metrics/interfaces.go +++ b/clientlibrary/metrics/interfaces.go @@ -41,6 +41,10 @@ type MonitoringService interface { LeaseRenewed(shard string) RecordGetRecordsTime(shard string, time float64) RecordProcessRecordsTime(shard string, time float64) + IncrLocalTPSBackoffs(shard string, count int) + IncrMaxBytesBackoffs(shard string, count int) + IncrThrottlingBackoffs(shard string, count int) + IncrGetRecordsErrors(shard string, count int) Shutdown() } @@ -60,3 +64,7 @@ func (NoopMonitoringService) LeaseLost(_ string) {} func (NoopMonitoringService) LeaseRenewed(_ string) {} func (NoopMonitoringService) RecordGetRecordsTime(_ string, _ float64) {} func (NoopMonitoringService) RecordProcessRecordsTime(_ string, _ float64) {} +func (NoopMonitoringService) IncrLocalTPSBackoffs(_ string, _ int) {} +func (NoopMonitoringService) IncrMaxBytesBackoffs(_ string, _ int) {} +func (NoopMonitoringService) IncrThrottlingBackoffs(_ string, _ int) {} +func (NoopMonitoringService) IncrGetRecordsErrors(_ string, _ int) {} diff --git a/clientlibrary/metrics/prometheus/prometheus.go b/clientlibrary/metrics/prometheus/prometheus.go index e489a78..fefde58 100644 --- a/clientlibrary/metrics/prometheus/prometheus.go +++ b/clientlibrary/metrics/prometheus/prometheus.go @@ -55,6 +55,10 @@ type MonitoringService struct { leaseRenewals *prom.CounterVec getRecordsTime *prom.HistogramVec processRecordsTime *prom.HistogramVec + localTPSBackoffs *prom.CounterVec + maxBytesBackoffs *prom.CounterVec + throttlingBackoffs *prom.CounterVec + getRecordsErrors *prom.CounterVec } // NewMonitoringService returns a Monitoring service publishing metrics to Prometheus. @@ -99,6 +103,22 @@ func (p *MonitoringService) Init(appName, streamName, workerID string) error { Name: p.namespace + `_process_records_duration_milliseconds`, Help: "The time taken to process records", }, []string{"kinesisStream", "shard"}) + p.localTPSBackoffs = prom.NewCounterVec(prom.CounterOpts{ + Name: p.namespace + `_local_tps_backoffs`, + Help: "The number of times backoffs happen from exceeding kinesis read TPS limit", + }, []string{"kinesisStream", "shard"}) + p.maxBytesBackoffs = prom.NewCounterVec(prom.CounterOpts{ + Name: p.namespace + `_max_bytes_backoffs`, + Help: "The number of times backoffs happen from exceeding kinesis max bytes read limit", + }, []string{"kinesisStream", "shard"}) + p.throttlingBackoffs = prom.NewCounterVec(prom.CounterOpts{ + Name: p.namespace + `_throttling_backoffs`, + Help: "The number of times backoffs happen from encountering kinesis read throttling", + }, []string{"kinesisStream", "shard"}) + p.getRecordsErrors = prom.NewCounterVec(prom.CounterOpts{ + Name: p.namespace + `_get_records_errors`, + Help: "The number of times non-retryable errors happen from GetRecords", + }, []string{"kinesisStream", "shard"}) metrics := []prom.Collector{ p.processedBytes, @@ -108,6 +128,10 @@ func (p *MonitoringService) Init(appName, streamName, workerID string) error { p.leaseRenewals, p.getRecordsTime, p.processRecordsTime, + p.localTPSBackoffs, + p.maxBytesBackoffs, + p.throttlingBackoffs, + p.getRecordsErrors, } for _, metric := range metrics { err := prom.Register(metric) @@ -170,3 +194,19 @@ func (p *MonitoringService) RecordGetRecordsTime(shard string, time float64) { func (p *MonitoringService) RecordProcessRecordsTime(shard string, time float64) { p.processRecordsTime.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Observe(time) } + +func (p *MonitoringService) IncrLocalTPSBackoffs(shard string, count int) { + p.localTPSBackoffs.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Add(float64(count)) +} + +func (p *MonitoringService) IncrMaxBytesBackoffs(shard string, count int) { + p.maxBytesBackoffs.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Add(float64(count)) +} + +func (p *MonitoringService) IncrThrottlingBackoffs(shard string, count int) { + p.throttlingBackoffs.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Add(float64(count)) +} + +func (p *MonitoringService) IncrGetRecordsErrors(shard string, count int) { + p.getRecordsErrors.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Add(float64(count)) +} diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 3829850..3b4a0e0 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -32,10 +32,11 @@ package worker import ( "context" "errors" - log "github.com/sirupsen/logrus" "math" "time" + log "github.com/sirupsen/logrus" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" @@ -53,10 +54,10 @@ const ( ) var ( - rateLimitTimeNow = time.Now - rateLimitTimeSince = time.Since - localTPSExceededError = errors.New("Error GetRecords TPS Exceeded") - maxBytesExceededError = errors.New("Error GetRecords Max Bytes For Call Period Exceeded") + rateLimitTimeNow = time.Now + rateLimitTimeSince = time.Since + errLocalTPSExceeded = errors.New("error GetRecords TPS exceeded") + errMaxBytesExceeded = errors.New("error GetRecords max bytes for call period exceeded") ) // PollingShardConsumer is responsible for polling data records from a (specified) shard. @@ -175,13 +176,15 @@ func (sc *PollingShardConsumer) getRecords() error { sc.waitASecond(sc.currTime) continue } - if err == localTPSExceededError { - log.Infof("localTPSExceededError so sleep for a second") + if err == errLocalTPSExceeded { + log.Debugf("localTPSExceededError so sleep for a second") + sc.mService.IncrLocalTPSBackoffs(sc.shard.ID, 1) sc.waitASecond(sc.currTime) continue } - if err == maxBytesExceededError { - log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod) + if err == errMaxBytesExceeded { + log.Debugf("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod) + sc.mService.IncrMaxBytesBackoffs(sc.shard.ID, 1) time.Sleep(time.Duration(coolDownPeriod) * time.Second) continue } @@ -199,9 +202,11 @@ func (sc *PollingShardConsumer) getRecords() error { } // exponential backoff // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff + sc.mService.IncrThrottlingBackoffs(sc.shard.ID, 1) time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond) continue } + sc.mService.IncrGetRecordsErrors(sc.shard.ID, 1) log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs) return err } @@ -264,7 +269,7 @@ func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) { if sc.bytesRead%MaxBytesPerSecond > 0 { coolDown++ } - return coolDown, maxBytesExceededError + return coolDown, errMaxBytesExceeded } else { sc.remBytes -= sc.bytesRead } @@ -285,7 +290,7 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) } if sc.callsLeft < 1 { - return nil, 0, localTPSExceededError + return nil, 0, errLocalTPSExceeded } getResp, err := sc.kc.GetRecords(context.TODO(), gri) sc.callsLeft-- diff --git a/clientlibrary/worker/polling-shard-consumer_test.go b/clientlibrary/worker/polling-shard-consumer_test.go index 736b2bd..1b26ebb 100644 --- a/clientlibrary/worker/polling-shard-consumer_test.go +++ b/clientlibrary/worker/polling-shard-consumer_test.go @@ -32,7 +32,7 @@ import ( ) var ( - testGetRecordsError = errors.New("GetRecords Error") + errTestGetRecords = errors.New("GetRecords error") ) func TestCallGetRecordsAPI(t *testing.T) { @@ -62,7 +62,7 @@ func TestCallGetRecordsAPI(t *testing.T) { } out2, _, err2 := psc2.callGetRecordsAPI(&gri) assert.Nil(t, out2) - assert.ErrorIs(t, err2, localTPSExceededError) + assert.ErrorIs(t, err2, errLocalTPSExceeded) m2.AssertExpectations(t) // check that getRecords is called normally in bytesRead = 0 case @@ -162,7 +162,7 @@ func TestCallGetRecordsAPI(t *testing.T) { // case where getRecords throws error m7 := MockKinesisSubscriberGetter{} ret7 := kinesis.GetRecordsOutput{Records: nil} - m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, testGetRecordsError) + m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, errTestGetRecords) psc7 := PollingShardConsumer{ commonShardConsumer: commonShardConsumer{kc: &m7}, callsLeft: 2, @@ -172,7 +172,7 @@ func TestCallGetRecordsAPI(t *testing.T) { return 2 * time.Second } out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri) - assert.Equal(t, err7, testGetRecordsError) + assert.Equal(t, err7, errTestGetRecords) assert.Equal(t, checkSleepVal7, 0) assert.Equal(t, out7, &ret7) m7.AssertExpectations(t)