From df16ef451cf3c52752cf3ff8257c51644cdc4790 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Mon, 13 Feb 2023 17:56:11 -0800 Subject: [PATCH] fix: use nanosecond precision in lease comparisons Signed-off-by: Shiva Pentakota --- clientlibrary/checkpoint/dynamodb-checkpointer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 3a7e22e..b8f12d8 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -129,7 +129,7 @@ func (checkpointer *DynamoCheckpoint) Init() error { // GetLease attempts to gain a lock on the given shard func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error { newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC() - newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339) + newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339Nano) currentCheckpoint, err := checkpointer.getItem(shard.ID) if err != nil { return err @@ -161,7 +161,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign assignedTo := assignedVar.(*types.AttributeValueMemberS).Value leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value - currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout) + currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout) if err != nil { return err } @@ -246,7 +246,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign // CheckpointSequence writes a checkpoint at the designated sequence ID func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error { - leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339) + leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339Nano) marshalledCheckpoint := map[string]types.AttributeValue{ LeaseKeyKey: &types.AttributeValueMemberS{ Value: shard.ID, @@ -290,7 +290,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er // Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" { - currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value) + currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout.(*types.AttributeValueMemberS).Value) if err != nil { return err } @@ -370,7 +370,7 @@ func (checkpointer *DynamoCheckpoint) ClaimShard(shard *par.ShardStatus, claimID if err != nil && err != ErrSequenceIDNotFound { return err } - leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339) + leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339Nano) conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)` expressionAttributeValues := map[string]types.AttributeValue{