From c9793728a33ffac28319dfe297490112f56ed9b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= <476650+arl@users.noreply.github.com> Date: Sat, 9 Nov 2019 20:27:20 +0100 Subject: [PATCH] Fix 'get records time' metric (#53) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The time sent to the `metrics.MonitoringService.RecordGetRecordsTime`' was not the time taken by GetRecords, it was the time taken by `GetRecords` and `ProcessRecords` additioned together. Fixes #51 Signed-off-by: Aurélien Rainone --- clientlibrary/worker/shard-consumer.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 2b44922..06a09eb 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -169,7 +169,6 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { retriedErrors := 0 for { - getRecordsStartTime := time.Now() if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) { log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) err = sc.checkpointer.GetLease(shard, sc.consumerID) @@ -185,6 +184,8 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { } } + getRecordsStartTime := time.Now() + log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.StringValue(shardIterator)) getRecordsArgs := &kinesis.GetRecordsInput{ Limit: aws.Int64(int64(sc.kclConfig.MaxRecords)), @@ -207,6 +208,10 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { return err } + // Convert from nanoseconds to milliseconds + getRecordsTime := time.Since(getRecordsStartTime) / 1000000 + sc.mService.RecordGetRecordsTime(shard.ID, float64(getRecordsTime)) + // reset the retry count after success retriedErrors = 0 @@ -240,10 +245,6 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { sc.mService.IncrBytesProcessed(shard.ID, recordBytes) sc.mService.MillisBehindLatest(shard.ID, float64(*getResp.MillisBehindLatest)) - // Convert from nanoseconds to milliseconds - getRecordsTime := time.Since(getRecordsStartTime) / 1000000 - sc.mService.RecordGetRecordsTime(shard.ID, float64(getRecordsTime)) - // Idle between each read, the user is responsible for checkpoint the progress // This value is only used when no records are returned; if records are returned, it should immediately // retrieve the next set of records.