Release shard lease after shutdown (#31)

* Release shard lease after shutdown

Currently, only local cached shard info has been removed when worker losts the
lease. The info inside checkpointer (dynamoDB) is not removed. This causes
lease has been hold until the lease expiration and it might take too long
for shard is ready for other worker to grab. This change release the lease
in checkpointer immediately.

The user need to ensure appropriate checkpointing before return from
Shutdown callback.

Test:
  updated unit test and integration test to ensure only the shard owner
has been wiped out and leave the checkpoint information intact.

Signed-off-by: Tao Jiang <taoj@vmware.com>

* Add code coverage reporting

Add code coverage reporting for unit test.

Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
Tao Jiang 2019-07-09 21:24:11 -05:00
parent ac8d341cb1
commit 46fea317de
7 changed files with 126 additions and 21 deletions

View file

@ -48,11 +48,23 @@ const (
// Checkpointer handles checkpointing when a record has been processed
type Checkpointer interface {
// Init initialises the Checkpoint
Init() error
// GetLease attempts to gain a lock on the given shard
GetLease(*par.ShardStatus, string) error
// CheckpointSequence writes a checkpoint at the designated sequence ID
CheckpointSequence(*par.ShardStatus) error
// FetchCheckpoint retrieves the checkpoint for the given shard
FetchCheckpoint(*par.ShardStatus) error
// RemoveLeaseInfo to remove lease info for shard entry because the shard no longer exists
RemoveLeaseInfo(string) error
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
RemoveLeaseOwner(string) error
}
// ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found

View file

@ -89,7 +89,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
s, err := session.NewSession(&aws.Config{
Region: aws.String(checkpointer.kclConfig.RegionName),
Endpoint: &checkpointer.kclConfig.DynamoDBEndpoint,
Endpoint: aws.String(checkpointer.kclConfig.DynamoDBEndpoint),
Credentials: checkpointer.kclConfig.DynamoDBCredentials,
Retryer: client.DefaultRetryer{NumMaxRetries: checkpointer.Retries},
})
@ -133,43 +133,45 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
if err != nil {
return err
}
if !time.Now().UTC().After(currentLeaseTimeout) && assignedTo != newAssignTo {
return errors.New(ErrLeaseNotAquired)
}
log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout"
expressionAttributeValues = map[string]*dynamodb.AttributeValue{
":id": {
S: &shard.ID,
S: aws.String(shard.ID),
},
":assigned_to": {
S: &assignedTo,
S: aws.String(assignedTo),
},
":lease_timeout": {
S: &leaseTimeout,
S: aws.String(leaseTimeout),
},
}
}
marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
S: &shard.ID,
S: aws.String(shard.ID),
},
LEASE_OWNER_KEY: {
S: &newAssignTo,
S: aws.String(newAssignTo),
},
LEASE_TIMEOUT_KEY: {
S: &newLeaseTimeoutString,
S: aws.String(newLeaseTimeoutString),
},
}
if len(shard.ParentShardId) > 0 {
marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: &shard.ParentShardId}
marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: aws.String(shard.ParentShardId)}
}
if shard.Checkpoint != "" {
marshalledCheckpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] = &dynamodb.AttributeValue{
S: &shard.Checkpoint,
S: aws.String(shard.Checkpoint),
}
}
@ -196,16 +198,16 @@ func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus)
leaseTimeout := shard.LeaseTimeout.UTC().Format(time.RFC3339)
marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
S: &shard.ID,
S: aws.String(shard.ID),
},
CHECKPOINT_SEQUENCE_NUMBER_KEY: {
S: &shard.Checkpoint,
S: aws.String(shard.Checkpoint),
},
LEASE_OWNER_KEY: {
S: &shard.AssignedTo,
S: aws.String(shard.AssignedTo),
},
LEASE_TIMEOUT_KEY: {
S: &leaseTimeout,
S: aws.String(leaseTimeout),
},
}
@ -230,10 +232,10 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S)
shard.Mux.Lock()
defer shard.Mux.Unlock()
shard.Checkpoint = *sequenceID.S
shard.Checkpoint = aws.StringValue(sequenceID.S)
if assignedTo, ok := checkpoint[LEASE_OWNER_KEY]; ok {
shard.AssignedTo = *assignedTo.S
shard.AssignedTo = aws.StringValue(assignedTo.S)
}
return nil
}
@ -251,6 +253,23 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error {
return err
}
// RemoveLeaseOwner to remove lease owner for the shard entry
func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
input := &dynamodb.UpdateItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
S: aws.String(shardID),
},
},
UpdateExpression: aws.String("remove " + LEASE_OWNER_KEY),
}
_, err := checkpointer.svc.UpdateItem(input)
return err
}
func (checkpointer *DynamoCheckpoint) createTable() error {
input := &dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{

View file

@ -29,6 +29,7 @@ package checkpoint
import (
"errors"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
@ -43,7 +44,7 @@ import (
)
func TestDoesTableExist(t *testing.T) {
svc := &mockDynamoDB{tableExist: true}
svc := &mockDynamoDB{tableExist: true, item: map[string]*dynamodb.AttributeValue{}}
checkpoint := &DynamoCheckpoint{
TableName: "TableName",
svc: svc,
@ -60,7 +61,7 @@ func TestDoesTableExist(t *testing.T) {
}
func TestGetLeaseNotAquired(t *testing.T) {
svc := &mockDynamoDB{tableExist: true}
svc := &mockDynamoDB{tableExist: true, item: map[string]*dynamodb.AttributeValue{}}
kclConfig := cfg.NewKinesisClientLibConfig("appName", "test", "us-west-2", "abc").
WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10).
@ -91,7 +92,7 @@ func TestGetLeaseNotAquired(t *testing.T) {
}
func TestGetLeaseAquired(t *testing.T) {
svc := &mockDynamoDB{tableExist: true}
svc := &mockDynamoDB{tableExist: true, item: map[string]*dynamodb.AttributeValue{}}
kclConfig := cfg.NewKinesisClientLibConfig("appName", "test", "us-west-2", "abc").
WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10).
@ -102,7 +103,6 @@ func TestGetLeaseAquired(t *testing.T) {
WithMetricsMaxQueueSize(20)
checkpoint := NewDynamoCheckpoint(kclConfig).WithDynamoDB(svc)
checkpoint.Init()
checkpoint.svc = svc
marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
"ShardID": {
S: aws.String("0001"),
@ -139,6 +139,23 @@ func TestGetLeaseAquired(t *testing.T) {
} else if *id.S != "deadbeef" {
t.Errorf("Expected checkpoint to be deadbeef. Got '%s'", *id.S)
}
// release owner info
err = checkpoint.RemoveLeaseOwner(shard.ID)
assert.Nil(t, err)
status := &par.ShardStatus{
ID: shard.ID,
Mux: &sync.Mutex{},
}
checkpoint.FetchCheckpoint(status)
// checkpointer and parent shard id should be the same
assert.Equal(t, shard.Checkpoint, status.Checkpoint)
assert.Equal(t, shard.ParentShardId, status.ParentShardId)
// Only the lease owner has been wiped out
assert.Equal(t, "", status.GetLeaseOwner())
}
type mockDynamoDB struct {
@ -155,7 +172,28 @@ func (m *mockDynamoDB) DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.De
}
func (m *mockDynamoDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) {
m.item = input.Item
item := input.Item
if shardID, ok := item[LEASE_KEY_KEY]; ok {
m.item[LEASE_KEY_KEY] = shardID
}
if owner, ok := item[LEASE_OWNER_KEY]; ok {
m.item[LEASE_OWNER_KEY] = owner
}
if timeout, ok := item[LEASE_TIMEOUT_KEY]; ok {
m.item[LEASE_TIMEOUT_KEY] = timeout
}
if checkpoint, ok := item[CHECKPOINT_SEQUENCE_NUMBER_KEY]; ok {
m.item[CHECKPOINT_SEQUENCE_NUMBER_KEY] = checkpoint
}
if parent, ok := item[PARENT_SHARD_ID_KEY]; ok {
m.item[PARENT_SHARD_ID_KEY] = parent
}
return nil, nil
}
@ -165,6 +203,16 @@ func (m *mockDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemO
}, nil
}
func (m *mockDynamoDB) UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) {
exp := input.UpdateExpression
if aws.StringValue(exp) == "remove "+LEASE_OWNER_KEY {
delete(m.item, LEASE_OWNER_KEY)
}
return nil, nil
}
func (m *mockDynamoDB) CreateTable(input *dynamodb.CreateTableInput) (*dynamodb.CreateTableOutput, error) {
return &dynamodb.CreateTableOutput{}, nil
}

View file

@ -285,6 +285,13 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error {
func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) {
log.Infof("Release lease for shard %s", shard.ID)
shard.SetLeaseOwner("")
// Release the lease by wiping out the lease owner for the shard
// Note: we don't need to do anything in case of error here and shard lease will eventuall be expired.
if err := sc.checkpointer.RemoveLeaseOwner(shard.ID); err != nil {
log.Errorf("Failed to release shard lease or shard: %s Error: %+v", shard.ID, err)
}
// reporting lease lose metrics
sc.mService.LeaseLost(shard.ID)
}

View file

@ -2,4 +2,4 @@
. support/scripts/functions.sh
# Run only the unit tests and not integration tests
go test -race $(local_go_pkgs)
go test -cover -race $(local_go_pkgs)

View file

@ -20,6 +20,7 @@ package test
import (
"os"
"sync"
"testing"
"time"
@ -32,6 +33,7 @@ import (
"github.com/stretchr/testify/assert"
chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint"
cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config"
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
"github.com/vmware/vmware-go-kcl/clientlibrary/utils"
wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker"
)
@ -77,6 +79,20 @@ func TestWorkerInjectCheckpointer(t *testing.T) {
// wait a few seconds before shutdown processing
time.Sleep(10 * time.Second)
worker.Shutdown()
// verify the checkpointer after graceful shutdown
status := &par.ShardStatus{
ID: shardID,
Mux: &sync.Mutex{},
}
checkpointer.FetchCheckpoint(status)
// checkpointer should be the same
assert.NotEmpty(t, status.Checkpoint)
// Only the lease owner has been wiped out
assert.Equal(t, "", status.GetLeaseOwner())
}
func TestWorkerInjectKinesis(t *testing.T) {

View file

@ -50,6 +50,8 @@ const (
const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}`
const metricsSystem = "cloudwatch"
var shardID string
func TestWorker(t *testing.T) {
kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
WithInitialPositionInStream(cfg.LATEST).
@ -235,6 +237,7 @@ type dumpRecordProcessor struct {
func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) {
dd.t.Logf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber))
shardID = input.ShardId
}
func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {