From 46fea317deaabdbad337d783e474cac88fad2dbf Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Tue, 9 Jul 2019 21:24:11 -0500 Subject: [PATCH] Release shard lease after shutdown (#31) * Release shard lease after shutdown Currently, only local cached shard info has been removed when worker losts the lease. The info inside checkpointer (dynamoDB) is not removed. This causes lease has been hold until the lease expiration and it might take too long for shard is ready for other worker to grab. This change release the lease in checkpointer immediately. The user need to ensure appropriate checkpointing before return from Shutdown callback. Test: updated unit test and integration test to ensure only the shard owner has been wiped out and leave the checkpoint information intact. Signed-off-by: Tao Jiang * Add code coverage reporting Add code coverage reporting for unit test. Signed-off-by: Tao Jiang --- clientlibrary/checkpoint/checkpointer.go | 12 ++++ .../checkpoint/dynamodb-checkpointer.go | 49 +++++++++++----- .../checkpoint/dynamodb-checkpointer_test.go | 58 +++++++++++++++++-- clientlibrary/worker/shard-consumer.go | 7 +++ support/scripts/test.sh | 2 +- test/worker_custom_test.go | 16 +++++ test/worker_test.go | 3 + 7 files changed, 126 insertions(+), 21 deletions(-) diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index 1f48349..b3af0b7 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -48,11 +48,23 @@ const ( // Checkpointer handles checkpointing when a record has been processed type Checkpointer interface { + // Init initialises the Checkpoint Init() error + + // GetLease attempts to gain a lock on the given shard GetLease(*par.ShardStatus, string) error + + // CheckpointSequence writes a checkpoint at the designated sequence ID CheckpointSequence(*par.ShardStatus) error + + // FetchCheckpoint retrieves the checkpoint for the given shard FetchCheckpoint(*par.ShardStatus) error + + // RemoveLeaseInfo to remove lease info for shard entry because the shard no longer exists RemoveLeaseInfo(string) error + + // RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment + RemoveLeaseOwner(string) error } // ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 1e4ba4f..e80c6c0 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -89,7 +89,7 @@ func (checkpointer *DynamoCheckpoint) Init() error { s, err := session.NewSession(&aws.Config{ Region: aws.String(checkpointer.kclConfig.RegionName), - Endpoint: &checkpointer.kclConfig.DynamoDBEndpoint, + Endpoint: aws.String(checkpointer.kclConfig.DynamoDBEndpoint), Credentials: checkpointer.kclConfig.DynamoDBCredentials, Retryer: client.DefaultRetryer{NumMaxRetries: checkpointer.Retries}, }) @@ -133,43 +133,45 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign if err != nil { return err } + if !time.Now().UTC().After(currentLeaseTimeout) && assignedTo != newAssignTo { return errors.New(ErrLeaseNotAquired) } + log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout" expressionAttributeValues = map[string]*dynamodb.AttributeValue{ ":id": { - S: &shard.ID, + S: aws.String(shard.ID), }, ":assigned_to": { - S: &assignedTo, + S: aws.String(assignedTo), }, ":lease_timeout": { - S: &leaseTimeout, + S: aws.String(leaseTimeout), }, } } marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ LEASE_KEY_KEY: { - S: &shard.ID, + S: aws.String(shard.ID), }, LEASE_OWNER_KEY: { - S: &newAssignTo, + S: aws.String(newAssignTo), }, LEASE_TIMEOUT_KEY: { - S: &newLeaseTimeoutString, + S: aws.String(newLeaseTimeoutString), }, } if len(shard.ParentShardId) > 0 { - marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: &shard.ParentShardId} + marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: aws.String(shard.ParentShardId)} } if shard.Checkpoint != "" { marshalledCheckpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] = &dynamodb.AttributeValue{ - S: &shard.Checkpoint, + S: aws.String(shard.Checkpoint), } } @@ -196,16 +198,16 @@ func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) leaseTimeout := shard.LeaseTimeout.UTC().Format(time.RFC3339) marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ LEASE_KEY_KEY: { - S: &shard.ID, + S: aws.String(shard.ID), }, CHECKPOINT_SEQUENCE_NUMBER_KEY: { - S: &shard.Checkpoint, + S: aws.String(shard.Checkpoint), }, LEASE_OWNER_KEY: { - S: &shard.AssignedTo, + S: aws.String(shard.AssignedTo), }, LEASE_TIMEOUT_KEY: { - S: &leaseTimeout, + S: aws.String(leaseTimeout), }, } @@ -230,10 +232,10 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S) shard.Mux.Lock() defer shard.Mux.Unlock() - shard.Checkpoint = *sequenceID.S + shard.Checkpoint = aws.StringValue(sequenceID.S) if assignedTo, ok := checkpoint[LEASE_OWNER_KEY]; ok { - shard.AssignedTo = *assignedTo.S + shard.AssignedTo = aws.StringValue(assignedTo.S) } return nil } @@ -251,6 +253,23 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error { return err } +// RemoveLeaseOwner to remove lease owner for the shard entry +func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error { + input := &dynamodb.UpdateItemInput{ + TableName: aws.String(checkpointer.TableName), + Key: map[string]*dynamodb.AttributeValue{ + LEASE_KEY_KEY: { + S: aws.String(shardID), + }, + }, + UpdateExpression: aws.String("remove " + LEASE_OWNER_KEY), + } + + _, err := checkpointer.svc.UpdateItem(input) + + return err +} + func (checkpointer *DynamoCheckpoint) createTable() error { input := &dynamodb.CreateTableInput{ AttributeDefinitions: []*dynamodb.AttributeDefinition{ diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go index 20cce87..1b824b7 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go @@ -29,6 +29,7 @@ package checkpoint import ( "errors" + "github.com/stretchr/testify/assert" "sync" "testing" "time" @@ -43,7 +44,7 @@ import ( ) func TestDoesTableExist(t *testing.T) { - svc := &mockDynamoDB{tableExist: true} + svc := &mockDynamoDB{tableExist: true, item: map[string]*dynamodb.AttributeValue{}} checkpoint := &DynamoCheckpoint{ TableName: "TableName", svc: svc, @@ -60,7 +61,7 @@ func TestDoesTableExist(t *testing.T) { } func TestGetLeaseNotAquired(t *testing.T) { - svc := &mockDynamoDB{tableExist: true} + svc := &mockDynamoDB{tableExist: true, item: map[string]*dynamodb.AttributeValue{}} kclConfig := cfg.NewKinesisClientLibConfig("appName", "test", "us-west-2", "abc"). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). @@ -91,7 +92,7 @@ func TestGetLeaseNotAquired(t *testing.T) { } func TestGetLeaseAquired(t *testing.T) { - svc := &mockDynamoDB{tableExist: true} + svc := &mockDynamoDB{tableExist: true, item: map[string]*dynamodb.AttributeValue{}} kclConfig := cfg.NewKinesisClientLibConfig("appName", "test", "us-west-2", "abc"). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). @@ -102,7 +103,6 @@ func TestGetLeaseAquired(t *testing.T) { WithMetricsMaxQueueSize(20) checkpoint := NewDynamoCheckpoint(kclConfig).WithDynamoDB(svc) checkpoint.Init() - checkpoint.svc = svc marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ "ShardID": { S: aws.String("0001"), @@ -139,6 +139,23 @@ func TestGetLeaseAquired(t *testing.T) { } else if *id.S != "deadbeef" { t.Errorf("Expected checkpoint to be deadbeef. Got '%s'", *id.S) } + + // release owner info + err = checkpoint.RemoveLeaseOwner(shard.ID) + assert.Nil(t, err) + + status := &par.ShardStatus{ + ID: shard.ID, + Mux: &sync.Mutex{}, + } + checkpoint.FetchCheckpoint(status) + + // checkpointer and parent shard id should be the same + assert.Equal(t, shard.Checkpoint, status.Checkpoint) + assert.Equal(t, shard.ParentShardId, status.ParentShardId) + + // Only the lease owner has been wiped out + assert.Equal(t, "", status.GetLeaseOwner()) } type mockDynamoDB struct { @@ -155,7 +172,28 @@ func (m *mockDynamoDB) DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.De } func (m *mockDynamoDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) { - m.item = input.Item + item := input.Item + + if shardID, ok := item[LEASE_KEY_KEY]; ok { + m.item[LEASE_KEY_KEY] = shardID + } + + if owner, ok := item[LEASE_OWNER_KEY]; ok { + m.item[LEASE_OWNER_KEY] = owner + } + + if timeout, ok := item[LEASE_TIMEOUT_KEY]; ok { + m.item[LEASE_TIMEOUT_KEY] = timeout + } + + if checkpoint, ok := item[CHECKPOINT_SEQUENCE_NUMBER_KEY]; ok { + m.item[CHECKPOINT_SEQUENCE_NUMBER_KEY] = checkpoint + } + + if parent, ok := item[PARENT_SHARD_ID_KEY]; ok { + m.item[PARENT_SHARD_ID_KEY] = parent + } + return nil, nil } @@ -165,6 +203,16 @@ func (m *mockDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemO }, nil } +func (m *mockDynamoDB) UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) { + exp := input.UpdateExpression + + if aws.StringValue(exp) == "remove "+LEASE_OWNER_KEY { + delete(m.item, LEASE_OWNER_KEY) + } + + return nil, nil +} + func (m *mockDynamoDB) CreateTable(input *dynamodb.CreateTableInput) (*dynamodb.CreateTableOutput, error) { return &dynamodb.CreateTableOutput{}, nil } diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 7899a67..139c27d 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -285,6 +285,13 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error { func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) { log.Infof("Release lease for shard %s", shard.ID) shard.SetLeaseOwner("") + + // Release the lease by wiping out the lease owner for the shard + // Note: we don't need to do anything in case of error here and shard lease will eventuall be expired. + if err := sc.checkpointer.RemoveLeaseOwner(shard.ID); err != nil { + log.Errorf("Failed to release shard lease or shard: %s Error: %+v", shard.ID, err) + } + // reporting lease lose metrics sc.mService.LeaseLost(shard.ID) } diff --git a/support/scripts/test.sh b/support/scripts/test.sh index 78c0986..ee8226e 100755 --- a/support/scripts/test.sh +++ b/support/scripts/test.sh @@ -2,4 +2,4 @@ . support/scripts/functions.sh # Run only the unit tests and not integration tests -go test -race $(local_go_pkgs) +go test -cover -race $(local_go_pkgs) diff --git a/test/worker_custom_test.go b/test/worker_custom_test.go index 6b3beb2..2ac300f 100644 --- a/test/worker_custom_test.go +++ b/test/worker_custom_test.go @@ -20,6 +20,7 @@ package test import ( "os" + "sync" "testing" "time" @@ -32,6 +33,7 @@ import ( "github.com/stretchr/testify/assert" chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint" cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config" + par "github.com/vmware/vmware-go-kcl/clientlibrary/partition" "github.com/vmware/vmware-go-kcl/clientlibrary/utils" wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker" ) @@ -77,6 +79,20 @@ func TestWorkerInjectCheckpointer(t *testing.T) { // wait a few seconds before shutdown processing time.Sleep(10 * time.Second) worker.Shutdown() + + // verify the checkpointer after graceful shutdown + status := &par.ShardStatus{ + ID: shardID, + Mux: &sync.Mutex{}, + } + checkpointer.FetchCheckpoint(status) + + // checkpointer should be the same + assert.NotEmpty(t, status.Checkpoint) + + // Only the lease owner has been wiped out + assert.Equal(t, "", status.GetLeaseOwner()) + } func TestWorkerInjectKinesis(t *testing.T) { diff --git a/test/worker_test.go b/test/worker_test.go index 343f158..d950058 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -50,6 +50,8 @@ const ( const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}` const metricsSystem = "cloudwatch" +var shardID string + func TestWorker(t *testing.T) { kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). @@ -235,6 +237,7 @@ type dumpRecordProcessor struct { func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) { dd.t.Logf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) + shardID = input.ShardId } func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {