diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index 1f48349..b3af0b7 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -48,11 +48,23 @@ const ( // Checkpointer handles checkpointing when a record has been processed type Checkpointer interface { + // Init initialises the Checkpoint Init() error + + // GetLease attempts to gain a lock on the given shard GetLease(*par.ShardStatus, string) error + + // CheckpointSequence writes a checkpoint at the designated sequence ID CheckpointSequence(*par.ShardStatus) error + + // FetchCheckpoint retrieves the checkpoint for the given shard FetchCheckpoint(*par.ShardStatus) error + + // RemoveLeaseInfo to remove lease info for shard entry because the shard no longer exists RemoveLeaseInfo(string) error + + // RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment + RemoveLeaseOwner(string) error } // ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 1e4ba4f..e80c6c0 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -89,7 +89,7 @@ func (checkpointer *DynamoCheckpoint) Init() error { s, err := session.NewSession(&aws.Config{ Region: aws.String(checkpointer.kclConfig.RegionName), - Endpoint: &checkpointer.kclConfig.DynamoDBEndpoint, + Endpoint: aws.String(checkpointer.kclConfig.DynamoDBEndpoint), Credentials: checkpointer.kclConfig.DynamoDBCredentials, Retryer: client.DefaultRetryer{NumMaxRetries: checkpointer.Retries}, }) @@ -133,43 +133,45 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign if err != nil { return err } + if !time.Now().UTC().After(currentLeaseTimeout) && assignedTo != newAssignTo { return errors.New(ErrLeaseNotAquired) } + log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout" expressionAttributeValues = map[string]*dynamodb.AttributeValue{ ":id": { - S: &shard.ID, + S: aws.String(shard.ID), }, ":assigned_to": { - S: &assignedTo, + S: aws.String(assignedTo), }, ":lease_timeout": { - S: &leaseTimeout, + S: aws.String(leaseTimeout), }, } } marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ LEASE_KEY_KEY: { - S: &shard.ID, + S: aws.String(shard.ID), }, LEASE_OWNER_KEY: { - S: &newAssignTo, + S: aws.String(newAssignTo), }, LEASE_TIMEOUT_KEY: { - S: &newLeaseTimeoutString, + S: aws.String(newLeaseTimeoutString), }, } if len(shard.ParentShardId) > 0 { - marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: &shard.ParentShardId} + marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: aws.String(shard.ParentShardId)} } if shard.Checkpoint != "" { marshalledCheckpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] = &dynamodb.AttributeValue{ - S: &shard.Checkpoint, + S: aws.String(shard.Checkpoint), } } @@ -196,16 +198,16 @@ func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) leaseTimeout := shard.LeaseTimeout.UTC().Format(time.RFC3339) marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ LEASE_KEY_KEY: { - S: &shard.ID, + S: aws.String(shard.ID), }, CHECKPOINT_SEQUENCE_NUMBER_KEY: { - S: &shard.Checkpoint, + S: aws.String(shard.Checkpoint), }, LEASE_OWNER_KEY: { - S: &shard.AssignedTo, + S: aws.String(shard.AssignedTo), }, LEASE_TIMEOUT_KEY: { - S: &leaseTimeout, + S: aws.String(leaseTimeout), }, } @@ -230,10 +232,10 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S) shard.Mux.Lock() defer shard.Mux.Unlock() - shard.Checkpoint = *sequenceID.S + shard.Checkpoint = aws.StringValue(sequenceID.S) if assignedTo, ok := checkpoint[LEASE_OWNER_KEY]; ok { - shard.AssignedTo = *assignedTo.S + shard.AssignedTo = aws.StringValue(assignedTo.S) } return nil } @@ -251,6 +253,23 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error { return err } +// RemoveLeaseOwner to remove lease owner for the shard entry +func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error { + input := &dynamodb.UpdateItemInput{ + TableName: aws.String(checkpointer.TableName), + Key: map[string]*dynamodb.AttributeValue{ + LEASE_KEY_KEY: { + S: aws.String(shardID), + }, + }, + UpdateExpression: aws.String("remove " + LEASE_OWNER_KEY), + } + + _, err := checkpointer.svc.UpdateItem(input) + + return err +} + func (checkpointer *DynamoCheckpoint) createTable() error { input := &dynamodb.CreateTableInput{ AttributeDefinitions: []*dynamodb.AttributeDefinition{ diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go index 20cce87..1b824b7 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go @@ -29,6 +29,7 @@ package checkpoint import ( "errors" + "github.com/stretchr/testify/assert" "sync" "testing" "time" @@ -43,7 +44,7 @@ import ( ) func TestDoesTableExist(t *testing.T) { - svc := &mockDynamoDB{tableExist: true} + svc := &mockDynamoDB{tableExist: true, item: map[string]*dynamodb.AttributeValue{}} checkpoint := &DynamoCheckpoint{ TableName: "TableName", svc: svc, @@ -60,7 +61,7 @@ func TestDoesTableExist(t *testing.T) { } func TestGetLeaseNotAquired(t *testing.T) { - svc := &mockDynamoDB{tableExist: true} + svc := &mockDynamoDB{tableExist: true, item: map[string]*dynamodb.AttributeValue{}} kclConfig := cfg.NewKinesisClientLibConfig("appName", "test", "us-west-2", "abc"). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). @@ -91,7 +92,7 @@ func TestGetLeaseNotAquired(t *testing.T) { } func TestGetLeaseAquired(t *testing.T) { - svc := &mockDynamoDB{tableExist: true} + svc := &mockDynamoDB{tableExist: true, item: map[string]*dynamodb.AttributeValue{}} kclConfig := cfg.NewKinesisClientLibConfig("appName", "test", "us-west-2", "abc"). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). @@ -102,7 +103,6 @@ func TestGetLeaseAquired(t *testing.T) { WithMetricsMaxQueueSize(20) checkpoint := NewDynamoCheckpoint(kclConfig).WithDynamoDB(svc) checkpoint.Init() - checkpoint.svc = svc marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ "ShardID": { S: aws.String("0001"), @@ -139,6 +139,23 @@ func TestGetLeaseAquired(t *testing.T) { } else if *id.S != "deadbeef" { t.Errorf("Expected checkpoint to be deadbeef. Got '%s'", *id.S) } + + // release owner info + err = checkpoint.RemoveLeaseOwner(shard.ID) + assert.Nil(t, err) + + status := &par.ShardStatus{ + ID: shard.ID, + Mux: &sync.Mutex{}, + } + checkpoint.FetchCheckpoint(status) + + // checkpointer and parent shard id should be the same + assert.Equal(t, shard.Checkpoint, status.Checkpoint) + assert.Equal(t, shard.ParentShardId, status.ParentShardId) + + // Only the lease owner has been wiped out + assert.Equal(t, "", status.GetLeaseOwner()) } type mockDynamoDB struct { @@ -155,7 +172,28 @@ func (m *mockDynamoDB) DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.De } func (m *mockDynamoDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) { - m.item = input.Item + item := input.Item + + if shardID, ok := item[LEASE_KEY_KEY]; ok { + m.item[LEASE_KEY_KEY] = shardID + } + + if owner, ok := item[LEASE_OWNER_KEY]; ok { + m.item[LEASE_OWNER_KEY] = owner + } + + if timeout, ok := item[LEASE_TIMEOUT_KEY]; ok { + m.item[LEASE_TIMEOUT_KEY] = timeout + } + + if checkpoint, ok := item[CHECKPOINT_SEQUENCE_NUMBER_KEY]; ok { + m.item[CHECKPOINT_SEQUENCE_NUMBER_KEY] = checkpoint + } + + if parent, ok := item[PARENT_SHARD_ID_KEY]; ok { + m.item[PARENT_SHARD_ID_KEY] = parent + } + return nil, nil } @@ -165,6 +203,16 @@ func (m *mockDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemO }, nil } +func (m *mockDynamoDB) UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) { + exp := input.UpdateExpression + + if aws.StringValue(exp) == "remove "+LEASE_OWNER_KEY { + delete(m.item, LEASE_OWNER_KEY) + } + + return nil, nil +} + func (m *mockDynamoDB) CreateTable(input *dynamodb.CreateTableInput) (*dynamodb.CreateTableOutput, error) { return &dynamodb.CreateTableOutput{}, nil } diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 7899a67..139c27d 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -285,6 +285,13 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error { func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) { log.Infof("Release lease for shard %s", shard.ID) shard.SetLeaseOwner("") + + // Release the lease by wiping out the lease owner for the shard + // Note: we don't need to do anything in case of error here and shard lease will eventuall be expired. + if err := sc.checkpointer.RemoveLeaseOwner(shard.ID); err != nil { + log.Errorf("Failed to release shard lease or shard: %s Error: %+v", shard.ID, err) + } + // reporting lease lose metrics sc.mService.LeaseLost(shard.ID) } diff --git a/support/scripts/test.sh b/support/scripts/test.sh index 78c0986..ee8226e 100755 --- a/support/scripts/test.sh +++ b/support/scripts/test.sh @@ -2,4 +2,4 @@ . support/scripts/functions.sh # Run only the unit tests and not integration tests -go test -race $(local_go_pkgs) +go test -cover -race $(local_go_pkgs) diff --git a/test/worker_custom_test.go b/test/worker_custom_test.go index 6b3beb2..2ac300f 100644 --- a/test/worker_custom_test.go +++ b/test/worker_custom_test.go @@ -20,6 +20,7 @@ package test import ( "os" + "sync" "testing" "time" @@ -32,6 +33,7 @@ import ( "github.com/stretchr/testify/assert" chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint" cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config" + par "github.com/vmware/vmware-go-kcl/clientlibrary/partition" "github.com/vmware/vmware-go-kcl/clientlibrary/utils" wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker" ) @@ -77,6 +79,20 @@ func TestWorkerInjectCheckpointer(t *testing.T) { // wait a few seconds before shutdown processing time.Sleep(10 * time.Second) worker.Shutdown() + + // verify the checkpointer after graceful shutdown + status := &par.ShardStatus{ + ID: shardID, + Mux: &sync.Mutex{}, + } + checkpointer.FetchCheckpoint(status) + + // checkpointer should be the same + assert.NotEmpty(t, status.Checkpoint) + + // Only the lease owner has been wiped out + assert.Equal(t, "", status.GetLeaseOwner()) + } func TestWorkerInjectKinesis(t *testing.T) { diff --git a/test/worker_test.go b/test/worker_test.go index 343f158..d950058 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -50,6 +50,8 @@ const ( const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}` const metricsSystem = "cloudwatch" +var shardID string + func TestWorker(t *testing.T) { kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). @@ -235,6 +237,7 @@ type dumpRecordProcessor struct { func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) { dd.t.Logf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) + shardID = input.ShardId } func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {