This commit is contained in:
ravikiran-s 2023-05-03 20:46:13 +05:30
parent adb9ee1183
commit 75d4ce9cc7

View file

@ -296,7 +296,8 @@ func (d DynamodbStreamAdapterClient) convertGetRecordsInput(params *kinesis.GetR
} }
func (d DynamodbStreamAdapterClient) convertGetRecordsOutput(dynamoOutput *dynamodbstreams.GetRecordsOutput) *kinesis.GetRecordsOutput { func (d DynamodbStreamAdapterClient) convertGetRecordsOutput(dynamoOutput *dynamodbstreams.GetRecordsOutput) *kinesis.GetRecordsOutput {
kinesisOutput := kinesis.GetRecordsOutput{} var millisBehindLatest = time.Now().UnixMilli()
kinesisOutput := kinesis.GetRecordsOutput{MillisBehindLatest: &millisBehindLatest, Records: make([]ktypes.Record, 0)}
if dynamoOutput != nil && len(dynamoOutput.Records) != 0 { if dynamoOutput != nil && len(dynamoOutput.Records) != 0 {
kinesisOutput.Records = make([]ktypes.Record, 0) kinesisOutput.Records = make([]ktypes.Record, 0)
kinesisOutput.ResultMetadata = dynamoOutput.ResultMetadata kinesisOutput.ResultMetadata = dynamoOutput.ResultMetadata
@ -306,7 +307,7 @@ func (d DynamodbStreamAdapterClient) convertGetRecordsOutput(dynamoOutput *dynam
if lastRecord.Dynamodb != nil && lastRecord.Dynamodb.ApproximateCreationDateTime != nil { if lastRecord.Dynamodb != nil && lastRecord.Dynamodb.ApproximateCreationDateTime != nil {
lastRecordCreateTime = lastRecord.Dynamodb.ApproximateCreationDateTime lastRecordCreateTime = lastRecord.Dynamodb.ApproximateCreationDateTime
} }
millisBehindLatest := int64(math.Max(float64(time.Now().Sub(*lastRecordCreateTime).Milliseconds()), float64(0))) millisBehindLatest = int64(math.Max(float64(time.Now().Sub(*lastRecordCreateTime).Milliseconds()), float64(0)))
kinesisOutput.MillisBehindLatest = &millisBehindLatest kinesisOutput.MillisBehindLatest = &millisBehindLatest
} }
for _, record := range dynamoOutput.Records { for _, record := range dynamoOutput.Records {