From c05bfb7ac8834eb6e9e57d80cab58d9afa2eb57c Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Wed, 18 Apr 2018 15:50:15 -0700 Subject: [PATCH] KCL: Fixing checkpoint operation This change fixed the bug of not finding checkpoint when process restart. It also adds missing call to record processor for notifying the shard info and checkpoint when application first started. Test: Run hmake test and verify the log. Change-Id: I4bdf21ac10c5ee988a0860c140991f7d05975541 --- src/clientlibrary/interfaces/inputs.go | 11 +++++++ src/clientlibrary/worker/checkpointer.go | 35 ++++++++++++---------- src/clientlibrary/worker/shard-consumer.go | 23 +++++++++++--- src/clientlibrary/worker/worker.go | 1 + src/clientlibrary/worker/worker_test.go | 13 ++++---- 5 files changed, 58 insertions(+), 25 deletions(-) diff --git a/src/clientlibrary/interfaces/inputs.go b/src/clientlibrary/interfaces/inputs.go index 27590c3..8f7590d 100644 --- a/src/clientlibrary/interfaces/inputs.go +++ b/src/clientlibrary/interfaces/inputs.go @@ -3,6 +3,7 @@ package interfaces import ( "time" + "github.com/aws/aws-sdk-go/aws" ks "github.com/aws/aws-sdk-go/service/kinesis" ) @@ -60,3 +61,13 @@ type ( Checkpointer IRecordProcessorCheckpointer } ) + +var shutdownReasonMap = map[ShutdownReason]*string{ + REQUESTED: aws.String("REQUESTED"), + TERMINATE: aws.String("TERMINATE"), + ZOMBIE: aws.String("ZOMBIE"), +} + +func ShutdownReasonMessage(reason ShutdownReason) *string { + return shutdownReasonMap[reason] +} diff --git a/src/clientlibrary/worker/checkpointer.go b/src/clientlibrary/worker/checkpointer.go index 2ca4dda..39584db 100644 --- a/src/clientlibrary/worker/checkpointer.go +++ b/src/clientlibrary/worker/checkpointer.go @@ -15,6 +15,11 @@ import ( ) const ( + LEASE_KEY_KEY = "ShardID" + LEASE_OWNER_KEY = "AssignedTo" + LEASE_TIMEOUT_KEY = "LeaseTimeout" + CHECKPOINT_SEQUENCE_NUMBER_KEY = "Checkpoint" + // ErrLeaseNotAquired is returned when we failed to get a lock on the shard ErrLeaseNotAquired = "Lease is already held by another node" // ErrInvalidDynamoDBSchema is returned when there are one or more fields missing from the table @@ -74,8 +79,8 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo s return err } - assignedVar, assignedToOk := currentCheckpoint["AssignedTo"] - leaseVar, leaseTimeoutOk := currentCheckpoint["LeaseTimeout"] + assignedVar, assignedToOk := currentCheckpoint[LEASE_OWNER_KEY] + leaseVar, leaseTimeoutOk := currentCheckpoint[LEASE_TIMEOUT_KEY] var conditionalExpression string var expressionAttributeValues map[string]*dynamodb.AttributeValue @@ -108,19 +113,19 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo s } marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ - "ShardID": { + LEASE_KEY_KEY: { S: &shard.ID, }, - "AssignedTo": { + LEASE_OWNER_KEY: { S: &newAssignTo, }, - "LeaseTimeout": { + LEASE_TIMEOUT_KEY: { S: &newLeaseTimeoutString, }, } if shard.Checkpoint != "" { - marshalledCheckpoint["Checkpoint"] = &dynamodb.AttributeValue{ + marshalledCheckpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] = &dynamodb.AttributeValue{ S: &shard.Checkpoint, } } @@ -147,16 +152,16 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo s func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *shardStatus) error { leaseTimeout := shard.LeaseTimeout.UTC().Format(time.RFC3339) marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ - "ShardID": { + LEASE_KEY_KEY: { S: &shard.ID, }, - "SequenceID": { + CHECKPOINT_SEQUENCE_NUMBER_KEY: { S: &shard.Checkpoint, }, - "AssignedTo": { + LEASE_OWNER_KEY: { S: &shard.AssignedTo, }, - "LeaseTimeout": { + LEASE_TIMEOUT_KEY: { S: &leaseTimeout, }, } @@ -170,7 +175,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error return err } - sequenceID, ok := checkpoint["SequenceID"] + sequenceID, ok := checkpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] if !ok { return ErrSequenceIDNotFound } @@ -179,7 +184,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error defer shard.mux.Unlock() shard.Checkpoint = *sequenceID.S - if assignedTo, ok := checkpoint["Assignedto"]; ok { + if assignedTo, ok := checkpoint[LEASE_OWNER_KEY]; ok { shard.AssignedTo = *assignedTo.S } return nil @@ -189,13 +194,13 @@ func (checkpointer *DynamoCheckpoint) createTable() error { input := &dynamodb.CreateTableInput{ AttributeDefinitions: []*dynamodb.AttributeDefinition{ { - AttributeName: aws.String("ShardID"), + AttributeName: aws.String(LEASE_KEY_KEY), AttributeType: aws.String("S"), }, }, KeySchema: []*dynamodb.KeySchemaElement{ { - AttributeName: aws.String("ShardID"), + AttributeName: aws.String(LEASE_KEY_KEY), KeyType: aws.String("HASH"), }, }, @@ -256,7 +261,7 @@ func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynam item, err = checkpointer.svc.GetItem(&dynamodb.GetItemInput{ TableName: aws.String(checkpointer.TableName), Key: map[string]*dynamodb.AttributeValue{ - "ShardID": { + LEASE_KEY_KEY: { S: aws.String(shardID), }, }, diff --git a/src/clientlibrary/worker/shard-consumer.go b/src/clientlibrary/worker/shard-consumer.go index 905b48b..012eff9 100644 --- a/src/clientlibrary/worker/shard-consumer.go +++ b/src/clientlibrary/worker/shard-consumer.go @@ -56,6 +56,7 @@ type ShardConsumer struct { } func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) { + // Get checkpoint of the shard from dynamoDB err := sc.checkpointer.FetchCheckpoint(shard) if err != nil && err != ErrSequenceIDNotFound { return nil, err @@ -64,6 +65,8 @@ func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) { // If there isn't any checkpoint for the shard, use the configuration value. if shard.Checkpoint == "" { initPos := sc.kclConfig.InitialPositionInStream + log.Debugf("No checkpoint recorded for shard: %v, starting with: %v", shard.ID, + aws.StringValue(config.InitalPositionInStreamToShardIteratorType(initPos))) shardIterArgs := &kinesis.GetShardIteratorInput{ ShardId: &shard.ID, ShardIteratorType: config.InitalPositionInStreamToShardIteratorType(initPos), @@ -76,6 +79,7 @@ func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) { return iterResp.ShardIterator, nil } + log.Debugf("Start shard: %v at checkpoint: %v", shard.ID, shard.Checkpoint) shardIterArgs := &kinesis.GetShardIteratorInput{ ShardId: &shard.ID, ShardIteratorType: aws.String("AFTER_SEQUENCE_NUMBER"), @@ -98,6 +102,13 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { return err } + // Start processing events and notify record processor on shard and starting checkpoint + input := &kcl.InitializationInput{ + ShardId: shard.ID, + ExtendedSequenceNumber: &kcl.ExtendedSequenceNumber{SequenceNumber: aws.String(shard.Checkpoint)}, + } + sc.recordProcessor.Initialize(input) + recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer) var retriedErrors int @@ -147,7 +158,7 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { recordLength := len(input.Records) recordBytes := int64(0) - log.Debugf("Received %d records", recordLength) + log.Debugf("Received %d records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest) for _, r := range getResp.Records { recordBytes += int64(len(r.Data)) @@ -164,9 +175,6 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { sc.mService.RecordProcessRecordsTime(shard.ID, float64(processedRecordsTiming)) } - // Idle between each read, the user is responsible for checkpoint the progress - time.Sleep(time.Duration(sc.kclConfig.IdleTimeBetweenReadsInMillis) * time.Millisecond) - sc.mService.IncrRecordsProcessed(shard.ID, recordLength) sc.mService.IncrBytesProcessed(shard.ID, recordBytes) sc.mService.MillisBehindLatest(shard.ID, float64(*getResp.MillisBehindLatest)) @@ -175,6 +183,13 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { getRecordsTime := time.Since(getRecordsStartTime) / 1000000 sc.mService.RecordGetRecordsTime(shard.ID, float64(getRecordsTime)) + // Idle between each read, the user is responsible for checkpoint the progress + // This value is only used when no records are returned; if records are returned, it should immediately + // retrieve the next set of records. + if recordLength == 0 && aws.Int64Value(getResp.MillisBehindLatest) < int64(sc.kclConfig.IdleTimeBetweenReadsInMillis) { + time.Sleep(time.Duration(sc.kclConfig.IdleTimeBetweenReadsInMillis) * time.Millisecond) + } + // The shard has been closed, so no new records can be read from it if getResp.NextShardIterator == nil { log.Infof("Shard %s closed", shard.ID) diff --git a/src/clientlibrary/worker/worker.go b/src/clientlibrary/worker/worker.go index 39ed3d1..7ecb0a4 100644 --- a/src/clientlibrary/worker/worker.go +++ b/src/clientlibrary/worker/worker.go @@ -268,6 +268,7 @@ func (w *Worker) getShardIDs(startShardID string) error { var lastShardID string for _, s := range streamDesc.StreamDescription.Shards { + // found new shard if _, ok := w.shardStatus[*s.ShardId]; !ok { log.Debugf("Found shard with id %s", *s.ShardId) w.shardStatus[*s.ShardId] = &shardStatus{ diff --git a/src/clientlibrary/worker/worker_test.go b/src/clientlibrary/worker/worker_test.go index ebcbc3d..88b3dac 100644 --- a/src/clientlibrary/worker/worker_test.go +++ b/src/clientlibrary/worker/worker_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws" log "github.com/sirupsen/logrus" cfg "clientlibrary/config" @@ -28,9 +29,10 @@ func TestWorker(t *testing.T) { defer os.Unsetenv("AWS_SECRET_ACCESS_KEY") kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). - WithMaxRecords(40). + WithMaxRecords(10). WithMaxLeasesForWorker(1). - WithShardSyncIntervalMillis(5000) + WithShardSyncIntervalMillis(5000). + WithFailoverTimeMillis(300000) log.SetOutput(os.Stdout) log.SetLevel(log.DebugLevel) @@ -80,7 +82,7 @@ type dumpRecordProcessor struct { } func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) { - dd.t.Logf("sharId=%v", input.ShardId) + dd.t.Logf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) } func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { @@ -96,13 +98,12 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { assert.Equal(dd.t, specstr, string(v.Data)) } - dd.t.Logf("Checkpoint it and MillisBehindLatest = %v", input.MillisBehindLatest) // checkpoint it after processing this batch lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber + dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNubmer, input.MillisBehindLatest) input.Checkpointer.Checkpoint(lastRecordSequenceNubmer) } func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) { - dd.t.Logf("Shutdown Reason = %v", input.ShutdownReason) - + dd.t.Logf("Shutdown Reason: %v", aws.StringValue(kc.ShutdownReasonMessage(input.ShutdownReason))) }