Fix 'get records time' metric (#53)
The time sent to the `metrics.MonitoringService.RecordGetRecordsTime`' was not the time taken by GetRecords, it was the time taken by `GetRecords` and `ProcessRecords` additioned together. Fixes #51 Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
This commit is contained in:
parent
eb56e3b1d7
commit
c9793728a3
1 changed files with 6 additions and 5 deletions
|
|
@ -169,7 +169,6 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
|
||||||
retriedErrors := 0
|
retriedErrors := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
getRecordsStartTime := time.Now()
|
|
||||||
if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) {
|
if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) {
|
||||||
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
|
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
|
||||||
err = sc.checkpointer.GetLease(shard, sc.consumerID)
|
err = sc.checkpointer.GetLease(shard, sc.consumerID)
|
||||||
|
|
@ -185,6 +184,8 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getRecordsStartTime := time.Now()
|
||||||
|
|
||||||
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.StringValue(shardIterator))
|
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.StringValue(shardIterator))
|
||||||
getRecordsArgs := &kinesis.GetRecordsInput{
|
getRecordsArgs := &kinesis.GetRecordsInput{
|
||||||
Limit: aws.Int64(int64(sc.kclConfig.MaxRecords)),
|
Limit: aws.Int64(int64(sc.kclConfig.MaxRecords)),
|
||||||
|
|
@ -207,6 +208,10 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert from nanoseconds to milliseconds
|
||||||
|
getRecordsTime := time.Since(getRecordsStartTime) / 1000000
|
||||||
|
sc.mService.RecordGetRecordsTime(shard.ID, float64(getRecordsTime))
|
||||||
|
|
||||||
// reset the retry count after success
|
// reset the retry count after success
|
||||||
retriedErrors = 0
|
retriedErrors = 0
|
||||||
|
|
||||||
|
|
@ -240,10 +245,6 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
|
||||||
sc.mService.IncrBytesProcessed(shard.ID, recordBytes)
|
sc.mService.IncrBytesProcessed(shard.ID, recordBytes)
|
||||||
sc.mService.MillisBehindLatest(shard.ID, float64(*getResp.MillisBehindLatest))
|
sc.mService.MillisBehindLatest(shard.ID, float64(*getResp.MillisBehindLatest))
|
||||||
|
|
||||||
// Convert from nanoseconds to milliseconds
|
|
||||||
getRecordsTime := time.Since(getRecordsStartTime) / 1000000
|
|
||||||
sc.mService.RecordGetRecordsTime(shard.ID, float64(getRecordsTime))
|
|
||||||
|
|
||||||
// Idle between each read, the user is responsible for checkpoint the progress
|
// Idle between each read, the user is responsible for checkpoint the progress
|
||||||
// This value is only used when no records are returned; if records are returned, it should immediately
|
// This value is only used when no records are returned; if records are returned, it should immediately
|
||||||
// retrieve the next set of records.
|
// retrieve the next set of records.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue