fix: use nanosecond precision in lease comparisons

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
This commit is contained in:
Shiva Pentakota 2023-02-13 17:56:11 -08:00
parent fb17ec8bc6
commit df16ef451c

View file

@ -129,7 +129,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
// GetLease attempts to gain a lock on the given shard
func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error {
newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339)
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339Nano)
currentCheckpoint, err := checkpointer.getItem(shard.ID)
if err != nil {
return err
@ -161,7 +161,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
assignedTo := assignedVar.(*types.AttributeValueMemberS).Value
leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout)
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout)
if err != nil {
return err
}
@ -246,7 +246,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
// CheckpointSequence writes a checkpoint at the designated sequence ID
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error {
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339)
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339Nano)
marshalledCheckpoint := map[string]types.AttributeValue{
LeaseKeyKey: &types.AttributeValueMemberS{
Value: shard.ID,
@ -290,7 +290,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
// Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" {
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value)
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout.(*types.AttributeValueMemberS).Value)
if err != nil {
return err
}
@ -370,7 +370,7 @@ func (checkpointer *DynamoCheckpoint) ClaimShard(shard *par.ShardStatus, claimID
if err != nil && err != ErrSequenceIDNotFound {
return err
}
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339)
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339Nano)
conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)`
expressionAttributeValues := map[string]types.AttributeValue{