From adb9ee11830adbe02b121aee71018fb5ac0eaf35 Mon Sep 17 00:00:00 2001 From: ravikiran-s Date: Wed, 3 May 2023 19:50:21 +0530 Subject: [PATCH] milliseconds behind latest fix --- clientlibrary/streams/dynamodb_adapter_client.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/clientlibrary/streams/dynamodb_adapter_client.go b/clientlibrary/streams/dynamodb_adapter_client.go index b7e42ec..5e38268 100644 --- a/clientlibrary/streams/dynamodb_adapter_client.go +++ b/clientlibrary/streams/dynamodb_adapter_client.go @@ -10,6 +10,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kinesis" ktypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types" log "github.com/sirupsen/logrus" + "math" + "time" ) type DynamodbStreamAdapterClient struct { @@ -299,6 +301,13 @@ func (d DynamodbStreamAdapterClient) convertGetRecordsOutput(dynamoOutput *dynam kinesisOutput.Records = make([]ktypes.Record, 0) kinesisOutput.ResultMetadata = dynamoOutput.ResultMetadata kinesisOutput.NextShardIterator = dynamoOutput.NextShardIterator + lastRecord := dynamoOutput.Records[len(dynamoOutput.Records)-1] + var lastRecordCreateTime *time.Time + if lastRecord.Dynamodb != nil && lastRecord.Dynamodb.ApproximateCreationDateTime != nil { + lastRecordCreateTime = lastRecord.Dynamodb.ApproximateCreationDateTime + } + millisBehindLatest := int64(math.Max(float64(time.Now().Sub(*lastRecordCreateTime).Milliseconds()), float64(0))) + kinesisOutput.MillisBehindLatest = &millisBehindLatest } for _, record := range dynamoOutput.Records { reqBodyBytes := new(bytes.Buffer)