diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index 2345533..fe91359 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -29,6 +29,8 @@ package checkpoint import ( "errors" + "fmt" + par "github.com/vmware/vmware-go-kcl/clientlibrary/partition" ) @@ -41,11 +43,16 @@ const ( // We've completely processed all records in this shard. ShardEnd = "SHARD_END" - - // ErrLeaseNotAcquired is returned when we failed to get a lock on the shard - ErrLeaseNotAcquired = "lease is already held by another node" ) +type ErrLeaseNotAcquired struct { + cause string +} + +func (e ErrLeaseNotAcquired) Error() string { + return fmt.Sprintf("lease not acquired: %s", e.cause) +} + // Checkpointer handles checkpointing when a record has been processed type Checkpointer interface { // Init initialises the Checkpoint diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index fd6751e..3d70f36 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -28,7 +28,6 @@ package checkpoint import ( - "errors" "time" "github.com/aws/aws-sdk-go/aws" @@ -142,7 +141,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign } if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo { - return errors.New(ErrLeaseNotAcquired) + return ErrLeaseNotAcquired{"current lease timeout not yet expired"} } checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) @@ -186,7 +185,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign if err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException { - return errors.New(ErrLeaseNotAcquired) + return ErrLeaseNotAcquired{dynamodb.ErrCodeConditionalCheckFailedException} } } return err diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go index 55ec973..fe8a383 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go @@ -29,11 +29,12 @@ package checkpoint import ( "errors" - "github.com/stretchr/testify/assert" "sync" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/dynamodb" @@ -85,7 +86,7 @@ func TestGetLeaseNotAquired(t *testing.T) { Checkpoint: "", Mux: &sync.Mutex{}, }, "ijkl-mnop") - if err == nil || err.Error() != ErrLeaseNotAcquired { + if err == nil || !errors.As(err, &ErrLeaseNotAcquired{}) { t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err) } } diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 6d3a330..2f87f40 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -28,6 +28,7 @@ package worker import ( + "errors" "math" "sync" "time" @@ -162,7 +163,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) err = sc.checkpointer.GetLease(shard, sc.consumerID) if err != nil { - if err.Error() == chk.ErrLeaseNotAcquired { + if errors.As(err, &chk.ErrLeaseNotAcquired{}) { log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID) return nil } diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 907bf5d..5b04453 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -28,6 +28,7 @@ package worker import ( + "errors" "math/rand" "sync" "time" @@ -277,7 +278,7 @@ func (w *Worker) eventLoop() { err = w.checkpointer.GetLease(shard, w.workerID) if err != nil { // cannot get lease on the shard - if err.Error() != chk.ErrLeaseNotAcquired { + if !errors.As(err, &chk.ErrLeaseNotAcquired{}) { log.Errorf("Cannot get lease: %+v", err) } continue