Add context to ErrLeaseNotAcquired (#87)

* clientlibrary/checkpoint: convert ErrLeaseAcquired to struct

Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>

* clientlibrary/checkpoint: add context to ErrLeaseNotAcquired

Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>

* Use errors.As to check for ErrLeaseNotAcquired error

Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
This commit is contained in:
Aurélien Rainone 2021-01-26 05:30:10 +01:00 committed by Tao Jiang
parent adb264717b
commit 909d1774a3
5 changed files with 19 additions and 10 deletions

View file

@ -29,6 +29,8 @@ package checkpoint
import ( import (
"errors" "errors"
"fmt"
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition" par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
) )
@ -41,11 +43,16 @@ const (
// We've completely processed all records in this shard. // We've completely processed all records in this shard.
ShardEnd = "SHARD_END" ShardEnd = "SHARD_END"
// ErrLeaseNotAcquired is returned when we failed to get a lock on the shard
ErrLeaseNotAcquired = "lease is already held by another node"
) )
type ErrLeaseNotAcquired struct {
cause string
}
func (e ErrLeaseNotAcquired) Error() string {
return fmt.Sprintf("lease not acquired: %s", e.cause)
}
// Checkpointer handles checkpointing when a record has been processed // Checkpointer handles checkpointing when a record has been processed
type Checkpointer interface { type Checkpointer interface {
// Init initialises the Checkpoint // Init initialises the Checkpoint

View file

@ -28,7 +28,6 @@
package checkpoint package checkpoint
import ( import (
"errors"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@ -142,7 +141,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
} }
if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo { if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo {
return errors.New(ErrLeaseNotAcquired) return ErrLeaseNotAcquired{"current lease timeout not yet expired"}
} }
checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
@ -186,7 +185,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
if err != nil { if err != nil {
if awsErr, ok := err.(awserr.Error); ok { if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException { if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
return errors.New(ErrLeaseNotAcquired) return ErrLeaseNotAcquired{dynamodb.ErrCodeConditionalCheckFailedException}
} }
} }
return err return err

View file

@ -29,11 +29,12 @@ package checkpoint
import ( import (
"errors" "errors"
"github.com/stretchr/testify/assert"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb"
@ -85,7 +86,7 @@ func TestGetLeaseNotAquired(t *testing.T) {
Checkpoint: "", Checkpoint: "",
Mux: &sync.Mutex{}, Mux: &sync.Mutex{},
}, "ijkl-mnop") }, "ijkl-mnop")
if err == nil || err.Error() != ErrLeaseNotAcquired { if err == nil || !errors.As(err, &ErrLeaseNotAcquired{}) {
t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err) t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err)
} }
} }

View file

@ -28,6 +28,7 @@
package worker package worker
import ( import (
"errors"
"math" "math"
"sync" "sync"
"time" "time"
@ -162,7 +163,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
err = sc.checkpointer.GetLease(shard, sc.consumerID) err = sc.checkpointer.GetLease(shard, sc.consumerID)
if err != nil { if err != nil {
if err.Error() == chk.ErrLeaseNotAcquired { if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID) log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
return nil return nil
} }

View file

@ -28,6 +28,7 @@
package worker package worker
import ( import (
"errors"
"math/rand" "math/rand"
"sync" "sync"
"time" "time"
@ -277,7 +278,7 @@ func (w *Worker) eventLoop() {
err = w.checkpointer.GetLease(shard, w.workerID) err = w.checkpointer.GetLease(shard, w.workerID)
if err != nil { if err != nil {
// cannot get lease on the shard // cannot get lease on the shard
if err.Error() != chk.ErrLeaseNotAcquired { if !errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Errorf("Cannot get lease: %+v", err) log.Errorf("Cannot get lease: %+v", err)
} }
continue continue