diff --git a/src/clientlibrary/interfaces/inputs.go b/src/clientlibrary/interfaces/inputs.go index 27590c3..8f7590d 100644 --- a/src/clientlibrary/interfaces/inputs.go +++ b/src/clientlibrary/interfaces/inputs.go @@ -3,6 +3,7 @@ package interfaces import ( "time" + "github.com/aws/aws-sdk-go/aws" ks "github.com/aws/aws-sdk-go/service/kinesis" ) @@ -60,3 +61,13 @@ type ( Checkpointer IRecordProcessorCheckpointer } ) + +var shutdownReasonMap = map[ShutdownReason]*string{ + REQUESTED: aws.String("REQUESTED"), + TERMINATE: aws.String("TERMINATE"), + ZOMBIE: aws.String("ZOMBIE"), +} + +func ShutdownReasonMessage(reason ShutdownReason) *string { + return shutdownReasonMap[reason] +} diff --git a/src/clientlibrary/worker/checkpointer.go b/src/clientlibrary/worker/checkpointer.go index 2ca4dda..39584db 100644 --- a/src/clientlibrary/worker/checkpointer.go +++ b/src/clientlibrary/worker/checkpointer.go @@ -15,6 +15,11 @@ import ( ) const ( + LEASE_KEY_KEY = "ShardID" + LEASE_OWNER_KEY = "AssignedTo" + LEASE_TIMEOUT_KEY = "LeaseTimeout" + CHECKPOINT_SEQUENCE_NUMBER_KEY = "Checkpoint" + // ErrLeaseNotAquired is returned when we failed to get a lock on the shard ErrLeaseNotAquired = "Lease is already held by another node" // ErrInvalidDynamoDBSchema is returned when there are one or more fields missing from the table @@ -74,8 +79,8 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo s return err } - assignedVar, assignedToOk := currentCheckpoint["AssignedTo"] - leaseVar, leaseTimeoutOk := currentCheckpoint["LeaseTimeout"] + assignedVar, assignedToOk := currentCheckpoint[LEASE_OWNER_KEY] + leaseVar, leaseTimeoutOk := currentCheckpoint[LEASE_TIMEOUT_KEY] var conditionalExpression string var expressionAttributeValues map[string]*dynamodb.AttributeValue @@ -108,19 +113,19 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo s } marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ - "ShardID": { + LEASE_KEY_KEY: { S: &shard.ID, }, - "AssignedTo": { + LEASE_OWNER_KEY: { S: &newAssignTo, }, - "LeaseTimeout": { + LEASE_TIMEOUT_KEY: { S: &newLeaseTimeoutString, }, } if shard.Checkpoint != "" { - marshalledCheckpoint["Checkpoint"] = &dynamodb.AttributeValue{ + marshalledCheckpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] = &dynamodb.AttributeValue{ S: &shard.Checkpoint, } } @@ -147,16 +152,16 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo s func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *shardStatus) error { leaseTimeout := shard.LeaseTimeout.UTC().Format(time.RFC3339) marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ - "ShardID": { + LEASE_KEY_KEY: { S: &shard.ID, }, - "SequenceID": { + CHECKPOINT_SEQUENCE_NUMBER_KEY: { S: &shard.Checkpoint, }, - "AssignedTo": { + LEASE_OWNER_KEY: { S: &shard.AssignedTo, }, - "LeaseTimeout": { + LEASE_TIMEOUT_KEY: { S: &leaseTimeout, }, } @@ -170,7 +175,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error return err } - sequenceID, ok := checkpoint["SequenceID"] + sequenceID, ok := checkpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] if !ok { return ErrSequenceIDNotFound } @@ -179,7 +184,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error defer shard.mux.Unlock() shard.Checkpoint = *sequenceID.S - if assignedTo, ok := checkpoint["Assignedto"]; ok { + if assignedTo, ok := checkpoint[LEASE_OWNER_KEY]; ok { shard.AssignedTo = *assignedTo.S } return nil @@ -189,13 +194,13 @@ func (checkpointer *DynamoCheckpoint) createTable() error { input := &dynamodb.CreateTableInput{ AttributeDefinitions: []*dynamodb.AttributeDefinition{ { - AttributeName: aws.String("ShardID"), + AttributeName: aws.String(LEASE_KEY_KEY), AttributeType: aws.String("S"), }, }, KeySchema: []*dynamodb.KeySchemaElement{ { - AttributeName: aws.String("ShardID"), + AttributeName: aws.String(LEASE_KEY_KEY), KeyType: aws.String("HASH"), }, }, @@ -256,7 +261,7 @@ func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynam item, err = checkpointer.svc.GetItem(&dynamodb.GetItemInput{ TableName: aws.String(checkpointer.TableName), Key: map[string]*dynamodb.AttributeValue{ - "ShardID": { + LEASE_KEY_KEY: { S: aws.String(shardID), }, }, diff --git a/src/clientlibrary/worker/shard-consumer.go b/src/clientlibrary/worker/shard-consumer.go index 905b48b..012eff9 100644 --- a/src/clientlibrary/worker/shard-consumer.go +++ b/src/clientlibrary/worker/shard-consumer.go @@ -56,6 +56,7 @@ type ShardConsumer struct { } func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) { + // Get checkpoint of the shard from dynamoDB err := sc.checkpointer.FetchCheckpoint(shard) if err != nil && err != ErrSequenceIDNotFound { return nil, err @@ -64,6 +65,8 @@ func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) { // If there isn't any checkpoint for the shard, use the configuration value. if shard.Checkpoint == "" { initPos := sc.kclConfig.InitialPositionInStream + log.Debugf("No checkpoint recorded for shard: %v, starting with: %v", shard.ID, + aws.StringValue(config.InitalPositionInStreamToShardIteratorType(initPos))) shardIterArgs := &kinesis.GetShardIteratorInput{ ShardId: &shard.ID, ShardIteratorType: config.InitalPositionInStreamToShardIteratorType(initPos), @@ -76,6 +79,7 @@ func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) { return iterResp.ShardIterator, nil } + log.Debugf("Start shard: %v at checkpoint: %v", shard.ID, shard.Checkpoint) shardIterArgs := &kinesis.GetShardIteratorInput{ ShardId: &shard.ID, ShardIteratorType: aws.String("AFTER_SEQUENCE_NUMBER"), @@ -98,6 +102,13 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { return err } + // Start processing events and notify record processor on shard and starting checkpoint + input := &kcl.InitializationInput{ + ShardId: shard.ID, + ExtendedSequenceNumber: &kcl.ExtendedSequenceNumber{SequenceNumber: aws.String(shard.Checkpoint)}, + } + sc.recordProcessor.Initialize(input) + recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer) var retriedErrors int @@ -147,7 +158,7 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { recordLength := len(input.Records) recordBytes := int64(0) - log.Debugf("Received %d records", recordLength) + log.Debugf("Received %d records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest) for _, r := range getResp.Records { recordBytes += int64(len(r.Data)) @@ -164,9 +175,6 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { sc.mService.RecordProcessRecordsTime(shard.ID, float64(processedRecordsTiming)) } - // Idle between each read, the user is responsible for checkpoint the progress - time.Sleep(time.Duration(sc.kclConfig.IdleTimeBetweenReadsInMillis) * time.Millisecond) - sc.mService.IncrRecordsProcessed(shard.ID, recordLength) sc.mService.IncrBytesProcessed(shard.ID, recordBytes) sc.mService.MillisBehindLatest(shard.ID, float64(*getResp.MillisBehindLatest)) @@ -175,6 +183,13 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { getRecordsTime := time.Since(getRecordsStartTime) / 1000000 sc.mService.RecordGetRecordsTime(shard.ID, float64(getRecordsTime)) + // Idle between each read, the user is responsible for checkpoint the progress + // This value is only used when no records are returned; if records are returned, it should immediately + // retrieve the next set of records. + if recordLength == 0 && aws.Int64Value(getResp.MillisBehindLatest) < int64(sc.kclConfig.IdleTimeBetweenReadsInMillis) { + time.Sleep(time.Duration(sc.kclConfig.IdleTimeBetweenReadsInMillis) * time.Millisecond) + } + // The shard has been closed, so no new records can be read from it if getResp.NextShardIterator == nil { log.Infof("Shard %s closed", shard.ID) diff --git a/src/clientlibrary/worker/worker.go b/src/clientlibrary/worker/worker.go index 39ed3d1..7ecb0a4 100644 --- a/src/clientlibrary/worker/worker.go +++ b/src/clientlibrary/worker/worker.go @@ -268,6 +268,7 @@ func (w *Worker) getShardIDs(startShardID string) error { var lastShardID string for _, s := range streamDesc.StreamDescription.Shards { + // found new shard if _, ok := w.shardStatus[*s.ShardId]; !ok { log.Debugf("Found shard with id %s", *s.ShardId) w.shardStatus[*s.ShardId] = &shardStatus{ diff --git a/src/clientlibrary/worker/worker_test.go b/src/clientlibrary/worker/worker_test.go index ebcbc3d..88b3dac 100644 --- a/src/clientlibrary/worker/worker_test.go +++ b/src/clientlibrary/worker/worker_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws" log "github.com/sirupsen/logrus" cfg "clientlibrary/config" @@ -28,9 +29,10 @@ func TestWorker(t *testing.T) { defer os.Unsetenv("AWS_SECRET_ACCESS_KEY") kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). - WithMaxRecords(40). + WithMaxRecords(10). WithMaxLeasesForWorker(1). - WithShardSyncIntervalMillis(5000) + WithShardSyncIntervalMillis(5000). + WithFailoverTimeMillis(300000) log.SetOutput(os.Stdout) log.SetLevel(log.DebugLevel) @@ -80,7 +82,7 @@ type dumpRecordProcessor struct { } func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) { - dd.t.Logf("sharId=%v", input.ShardId) + dd.t.Logf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) } func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { @@ -96,13 +98,12 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { assert.Equal(dd.t, specstr, string(v.Data)) } - dd.t.Logf("Checkpoint it and MillisBehindLatest = %v", input.MillisBehindLatest) // checkpoint it after processing this batch lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber + dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNubmer, input.MillisBehindLatest) input.Checkpointer.Checkpoint(lastRecordSequenceNubmer) } func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) { - dd.t.Logf("Shutdown Reason = %v", input.ShutdownReason) - + dd.t.Logf("Shutdown Reason: %v", aws.StringValue(kc.ShutdownReasonMessage(input.ShutdownReason))) }