From 1044485392e97f28691ea72ec7c5ac41e600cc41 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Wed, 23 Dec 2020 09:47:47 -0600 Subject: [PATCH] Support Kinesis aggregation format (#84) Add support for Kinesis aggregation format to consume record published by KPL. Note: current implementation need to checkpoint the whole batch of the de-aggregated records instead of just portion of them. Add cache entry and exit time. Signed-off-by: Tao Jiang --- clientlibrary/checkpoint/checkpointer.go | 12 +- .../checkpoint/dynamodb-checkpointer.go | 40 ++--- .../checkpoint/dynamodb-checkpointer_test.go | 26 +-- clientlibrary/common/errors.go | 165 ------------------ clientlibrary/config/config.go | 9 - clientlibrary/interfaces/inputs.go | 34 +++- .../worker/record-processor-checkpointer.go | 2 +- clientlibrary/worker/shard-consumer.go | 35 ++-- clientlibrary/worker/worker.go | 6 +- go.mod | 2 + go.sum | 6 + test/record_processor_test.go | 10 +- test/record_publisher_test.go | 64 +++++++ test/worker_test.go | 2 +- 14 files changed, 166 insertions(+), 247 deletions(-) delete mode 100644 clientlibrary/common/errors.go diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index b3af0b7..4ff6fd2 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -33,14 +33,14 @@ import ( ) const ( - LEASE_KEY_KEY = "ShardID" - LEASE_OWNER_KEY = "AssignedTo" - LEASE_TIMEOUT_KEY = "LeaseTimeout" - CHECKPOINT_SEQUENCE_NUMBER_KEY = "Checkpoint" - PARENT_SHARD_ID_KEY = "ParentShardId" + LeaseKeyKey = "ShardID" + LeaseOwnerKey = "AssignedTo" + LeaseTimeoutKey = "LeaseTimeout" + SequenceNumberKey = "Checkpoint" + ParentShardIdKey = "ParentShardId" // We've completely processed all records in this shard. - SHARD_END = "SHARD_END" + ShardEnd = "SHARD_END" // ErrLeaseNotAquired is returned when we failed to get a lock on the shard ErrLeaseNotAquired = "Lease is already held by another node" diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 694522b..1145454 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -125,8 +125,8 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign return err } - assignedVar, assignedToOk := currentCheckpoint[LEASE_OWNER_KEY] - leaseVar, leaseTimeoutOk := currentCheckpoint[LEASE_TIMEOUT_KEY] + assignedVar, assignedToOk := currentCheckpoint[LeaseOwnerKey] + leaseVar, leaseTimeoutOk := currentCheckpoint[LeaseTimeoutKey] var conditionalExpression string var expressionAttributeValues map[string]*dynamodb.AttributeValue @@ -161,23 +161,23 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign } marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ - LEASE_KEY_KEY: { + LeaseKeyKey: { S: aws.String(shard.ID), }, - LEASE_OWNER_KEY: { + LeaseOwnerKey: { S: aws.String(newAssignTo), }, - LEASE_TIMEOUT_KEY: { + LeaseTimeoutKey: { S: aws.String(newLeaseTimeoutString), }, } if len(shard.ParentShardId) > 0 { - marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: aws.String(shard.ParentShardId)} + marshalledCheckpoint[ParentShardIdKey] = &dynamodb.AttributeValue{S: aws.String(shard.ParentShardId)} } if shard.Checkpoint != "" { - marshalledCheckpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] = &dynamodb.AttributeValue{ + marshalledCheckpoint[SequenceNumberKey] = &dynamodb.AttributeValue{ S: aws.String(shard.Checkpoint), } } @@ -204,22 +204,22 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error { leaseTimeout := shard.LeaseTimeout.UTC().Format(time.RFC3339) marshalledCheckpoint := map[string]*dynamodb.AttributeValue{ - LEASE_KEY_KEY: { + LeaseKeyKey: { S: aws.String(shard.ID), }, - CHECKPOINT_SEQUENCE_NUMBER_KEY: { + SequenceNumberKey: { S: aws.String(shard.Checkpoint), }, - LEASE_OWNER_KEY: { + LeaseOwnerKey: { S: aws.String(shard.AssignedTo), }, - LEASE_TIMEOUT_KEY: { + LeaseTimeoutKey: { S: aws.String(leaseTimeout), }, } if len(shard.ParentShardId) > 0 { - marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: &shard.ParentShardId} + marshalledCheckpoint[ParentShardIdKey] = &dynamodb.AttributeValue{S: &shard.ParentShardId} } return checkpointer.saveItem(marshalledCheckpoint) @@ -232,7 +232,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er return err } - sequenceID, ok := checkpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] + sequenceID, ok := checkpoint[SequenceNumberKey] if !ok { return ErrSequenceIDNotFound } @@ -241,7 +241,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er defer shard.Mux.Unlock() shard.Checkpoint = aws.StringValue(sequenceID.S) - if assignedTo, ok := checkpoint[LEASE_OWNER_KEY]; ok { + if assignedTo, ok := checkpoint[LeaseOwnerKey]; ok { shard.AssignedTo = aws.StringValue(assignedTo.S) } return nil @@ -265,11 +265,11 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error { input := &dynamodb.UpdateItemInput{ TableName: aws.String(checkpointer.TableName), Key: map[string]*dynamodb.AttributeValue{ - LEASE_KEY_KEY: { + LeaseKeyKey: { S: aws.String(shardID), }, }, - UpdateExpression: aws.String("remove " + LEASE_OWNER_KEY), + UpdateExpression: aws.String("remove " + LeaseOwnerKey), } _, err := checkpointer.svc.UpdateItem(input) @@ -281,13 +281,13 @@ func (checkpointer *DynamoCheckpoint) createTable() error { input := &dynamodb.CreateTableInput{ AttributeDefinitions: []*dynamodb.AttributeDefinition{ { - AttributeName: aws.String(LEASE_KEY_KEY), + AttributeName: aws.String(LeaseKeyKey), AttributeType: aws.String("S"), }, }, KeySchema: []*dynamodb.KeySchemaElement{ { - AttributeName: aws.String(LEASE_KEY_KEY), + AttributeName: aws.String(LeaseKeyKey), KeyType: aws.String("HASH"), }, }, @@ -334,7 +334,7 @@ func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynam item, err := checkpointer.svc.GetItem(&dynamodb.GetItemInput{ TableName: aws.String(checkpointer.TableName), Key: map[string]*dynamodb.AttributeValue{ - LEASE_KEY_KEY: { + LeaseKeyKey: { S: aws.String(shardID), }, }, @@ -346,7 +346,7 @@ func (checkpointer *DynamoCheckpoint) removeItem(shardID string) error { _, err := checkpointer.svc.DeleteItem(&dynamodb.DeleteItemInput{ TableName: aws.String(checkpointer.TableName), Key: map[string]*dynamodb.AttributeValue{ - LEASE_KEY_KEY: { + LeaseKeyKey: { S: aws.String(shardID), }, }, diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go index 6eaead5..ec3894b 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go @@ -131,7 +131,7 @@ func TestGetLeaseAquired(t *testing.T) { t.Errorf("Lease not aquired after timeout %s", err) } - id, ok := svc.item[CHECKPOINT_SEQUENCE_NUMBER_KEY] + id, ok := svc.item[SequenceNumberKey] if !ok { t.Error("Expected checkpoint to be set by GetLease") } else if *id.S != "deadbeef" { @@ -172,24 +172,24 @@ func (m *mockDynamoDB) DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.De func (m *mockDynamoDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) { item := input.Item - if shardID, ok := item[LEASE_KEY_KEY]; ok { - m.item[LEASE_KEY_KEY] = shardID + if shardID, ok := item[LeaseKeyKey]; ok { + m.item[LeaseKeyKey] = shardID } - if owner, ok := item[LEASE_OWNER_KEY]; ok { - m.item[LEASE_OWNER_KEY] = owner + if owner, ok := item[LeaseOwnerKey]; ok { + m.item[LeaseOwnerKey] = owner } - if timeout, ok := item[LEASE_TIMEOUT_KEY]; ok { - m.item[LEASE_TIMEOUT_KEY] = timeout + if timeout, ok := item[LeaseTimeoutKey]; ok { + m.item[LeaseTimeoutKey] = timeout } - if checkpoint, ok := item[CHECKPOINT_SEQUENCE_NUMBER_KEY]; ok { - m.item[CHECKPOINT_SEQUENCE_NUMBER_KEY] = checkpoint + if checkpoint, ok := item[SequenceNumberKey]; ok { + m.item[SequenceNumberKey] = checkpoint } - if parent, ok := item[PARENT_SHARD_ID_KEY]; ok { - m.item[PARENT_SHARD_ID_KEY] = parent + if parent, ok := item[ParentShardIdKey]; ok { + m.item[ParentShardIdKey] = parent } return nil, nil @@ -204,8 +204,8 @@ func (m *mockDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemO func (m *mockDynamoDB) UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) { exp := input.UpdateExpression - if aws.StringValue(exp) == "remove "+LEASE_OWNER_KEY { - delete(m.item, LEASE_OWNER_KEY) + if aws.StringValue(exp) == "remove "+LeaseOwnerKey { + delete(m.item, LeaseOwnerKey) } return nil, nil diff --git a/clientlibrary/common/errors.go b/clientlibrary/common/errors.go deleted file mode 100644 index da32eef..0000000 --- a/clientlibrary/common/errors.go +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright (c) 2018 VMware, Inc. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and - * associated documentation files (the "Software"), to deal in the Software without restriction, including - * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is furnished to do - * so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial - * portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT - * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. - * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE - * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ -package common - -import ( - "fmt" - "net/http" -) - -// ErrorCode is unified definition of numerical error codes -type ErrorCode int32 - -// pre-defined error codes -const ( - // System Wide 41000 - 42000 - KinesisClientLibError ErrorCode = 41000 - - // KinesisClientLibrary Retryable Errors 41001 - 41100 - KinesisClientLibRetryableError ErrorCode = 41001 - - KinesisClientLibIOError ErrorCode = 41002 - BlockedOnParentShardError ErrorCode = 41003 - KinesisClientLibDependencyError ErrorCode = 41004 - ThrottlingError ErrorCode = 41005 - - // KinesisClientLibrary NonRetryable Errors 41100 - 41200 - KinesisClientLibNonRetryableException ErrorCode = 41100 - - InvalidStateError ErrorCode = 41101 - ShutdownError ErrorCode = 41102 - - // Kinesis Lease Errors 41200 - 41300 - LeasingError ErrorCode = 41200 - - LeasingInvalidStateError ErrorCode = 41201 - LeasingDependencyError ErrorCode = 41202 - LeasingProvisionedThroughputError ErrorCode = 41203 - - // Misc Errors 41300 - 41400 - // NotImplemented - KinesisClientLibNotImplemented ErrorCode = 41301 - - // Error indicates passing illegal or inappropriate argument - IllegalArgumentError ErrorCode = 41302 -) - -var errorMap = map[ErrorCode]ClientLibraryError{ - KinesisClientLibError: {ErrorCode: KinesisClientLibError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Top level error of Kinesis Client Library"}, - - // Retryable - KinesisClientLibRetryableError: {ErrorCode: KinesisClientLibRetryableError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Retryable exceptions (e.g. transient errors). The request/operation is expected to succeed upon (back off and) retry."}, - KinesisClientLibIOError: {ErrorCode: KinesisClientLibIOError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in reading/writing information (e.g. shard information from Kinesis may not be current/complete)."}, - BlockedOnParentShardError: {ErrorCode: BlockedOnParentShardError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Cannot start processing data for a shard because the data from the parent shard has not been completely processed (yet)."}, - KinesisClientLibDependencyError: {ErrorCode: KinesisClientLibDependencyError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Cannot talk to its dependencies (e.g. fetching data from Kinesis, DynamoDB table reads/writes)."}, - ThrottlingError: {ErrorCode: ThrottlingError, Retryable: true, Status: http.StatusTooManyRequests, Msg: "Requests are throttled by a service (e.g. DynamoDB when storing a checkpoint)."}, - - // Non-Retryable - KinesisClientLibNonRetryableException: {ErrorCode: KinesisClientLibNonRetryableException, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Non-retryable exceptions. Simply retrying the same request/operation is not expected to succeed."}, - InvalidStateError: {ErrorCode: InvalidStateError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Kinesis Library has issues with internal state (e.g. DynamoDB table is not found)."}, - ShutdownError: {ErrorCode: ShutdownError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "The RecordProcessor instance has been shutdown (e.g. and attempts a checkpiont)."}, - - // Leasing - LeasingError: {ErrorCode: LeasingError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Top-level error type for the leasing code."}, - LeasingInvalidStateError: {ErrorCode: LeasingInvalidStateError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed because DynamoDB is an invalid state"}, - LeasingDependencyError: {ErrorCode: LeasingDependencyError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed because a dependency of the leasing system has failed."}, - LeasingProvisionedThroughputError: {ErrorCode: LeasingProvisionedThroughputError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed due to lack of provisioned throughput for a DynamoDB table."}, - - // IllegalArgumentError - IllegalArgumentError: {ErrorCode: IllegalArgumentError, Retryable: false, Status: http.StatusBadRequest, Msg: "Error indicates that a method has been passed an illegal or inappropriate argument."}, - - // Not Implemented - KinesisClientLibNotImplemented: {ErrorCode: KinesisClientLibNotImplemented, Retryable: false, Status: http.StatusNotImplemented, Msg: "Not Implemented"}, -} - -// Message returns the message of the error code -func (c ErrorCode) Message() string { - return errorMap[c].Msg -} - -// MakeErr makes an error with default message -func (c ErrorCode) MakeErr() *ClientLibraryError { - e := errorMap[c] - return &e -} - -// MakeError makes an error with message and data -func (c ErrorCode) MakeError(detail string) error { - e := errorMap[c] - return e.WithDetail(detail) -} - -// ClientLibraryError is unified error -type ClientLibraryError struct { - // ErrorCode is the numerical error code. - ErrorCode `json:"code"` - // Retryable is a bool flag to indicate the whether the error is retryable or not. - Retryable bool `json:"tryable"` - // Status is the HTTP status code. - Status int `json:"status"` - // Msg provides a terse description of the error. Its value is defined in errorMap. - Msg string `json:"msg"` - // Detail provides a detailed description of the error. Its value is set using WithDetail. - Detail string `json:"detail"` -} - -// Error implements error -func (e *ClientLibraryError) Error() string { - var prefix string - if e.Retryable { - prefix = "Retryable" - } else { - prefix = "NonRetryable" - } - msg := fmt.Sprintf("%v Error [%d]: %s", prefix, int32(e.ErrorCode), e.Msg) - if e.Detail != "" { - msg = fmt.Sprintf("%s, detail: %s", msg, e.Detail) - } - return msg -} - -// WithMsg overwrites the default error message -func (e *ClientLibraryError) WithMsg(format string, v ...interface{}) *ClientLibraryError { - e.Msg = fmt.Sprintf(format, v...) - return e -} - -// WithDetail adds a detailed message to error -func (e *ClientLibraryError) WithDetail(format string, v ...interface{}) *ClientLibraryError { - if len(e.Detail) == 0 { - e.Detail = fmt.Sprintf(format, v...) - } else { - e.Detail += ", " + fmt.Sprintf(format, v...) - } - return e -} - -// WithCause adds CauseBy to error -func (e *ClientLibraryError) WithCause(err error) *ClientLibraryError { - if err != nil { - // Store error message in Detail, so the info can be preserved - // when CascadeError is marshaled to json. - if len(e.Detail) == 0 { - e.Detail = err.Error() - } else { - e.Detail += ", cause: " + err.Error() - } - } - return e -} diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index c517645..91d73b8 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -121,15 +121,6 @@ const ( // The amount of milliseconds to wait before graceful shutdown forcefully terminates. DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000 - - // The size of the thread pool to create for the lease renewer to use. - DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20 - - // The sleep time between two listShards calls from the proxy when throttled. - DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS = 1500 - - // The number of times the Proxy will retry listShards call when throttled. - DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50 ) type ( diff --git a/clientlibrary/interfaces/inputs.go b/clientlibrary/interfaces/inputs.go index 84bf03f..eb12387 100644 --- a/clientlibrary/interfaces/inputs.go +++ b/clientlibrary/interfaces/inputs.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018 VMware, Inc. + * Copyright (c) 2020 VMware, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and * associated documentation files (the "Software"), to deal in the Software without restriction, including @@ -48,6 +48,7 @@ const ( * instead depend on a different interface for backward compatibility. */ REQUESTED ShutdownReason = iota + 1 + /** * Terminate processing for this RecordProcessor (resharding use case). * Indicates that the shard is closed and all records from the shard have been delivered to the application. @@ -55,6 +56,7 @@ const ( * from this shard and processing of child shards can be started. */ TERMINATE + /** * Processing will be moved to a different record processor (fail over, load balancing use cases). * Applications SHOULD NOT checkpoint their progress (as another record processor may have already started @@ -76,22 +78,36 @@ type ( ShutdownReason int InitializationInput struct { - ShardId string - ExtendedSequenceNumber *ExtendedSequenceNumber - PendingCheckpointSequenceNumber *ExtendedSequenceNumber + // The shardId that the record processor is being initialized for. + ShardId string + + // The last extended sequence number that was successfully checkpointed by the previous record processor. + ExtendedSequenceNumber *ExtendedSequenceNumber } ProcessRecordsInput struct { - CacheEntryTime *time.Time - CacheExitTime *time.Time - Records []*ks.Record - Checkpointer IRecordProcessorCheckpointer + // The time that this batch of records was received by the KCL. + CacheEntryTime *time.Time + + // The time that this batch of records was prepared to be provided to the RecordProcessor. + CacheExitTime *time.Time + + // The records received from Kinesis. These records may have been de-aggregated if they were published by the KPL. + Records []*ks.Record + + // A checkpointer that the RecordProcessor can use to checkpoint its progress. + Checkpointer IRecordProcessorCheckpointer + + // How far behind this batch of records was when received from Kinesis. MillisBehindLatest int64 } ShutdownInput struct { + // ShutdownReason shows why RecordProcessor is going to be shutdown. ShutdownReason ShutdownReason - Checkpointer IRecordProcessorCheckpointer + + // Checkpointer is used to record the current progress. + Checkpointer IRecordProcessorCheckpointer } ) diff --git a/clientlibrary/worker/record-processor-checkpointer.go b/clientlibrary/worker/record-processor-checkpointer.go index e1034b6..b6baee6 100644 --- a/clientlibrary/worker/record-processor-checkpointer.go +++ b/clientlibrary/worker/record-processor-checkpointer.go @@ -69,7 +69,7 @@ func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error // checkpoint the last sequence of a closed shard if sequenceNumber == nil { - rc.shard.Checkpoint = chk.SHARD_END + rc.shard.Checkpoint = chk.ShardEnd } else { rc.shard.Checkpoint = aws.StringValue(sequenceNumber) } diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 476ab01..4cb963f 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -36,6 +36,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" + deagg "github.com/awslabs/kinesis-aggregation/go/deaggregator" chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint" "github.com/vmware/vmware-go-kcl/clientlibrary/config" @@ -47,19 +48,7 @@ import ( const ( // This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all // parent shards have been completed. - WAITING_ON_PARENT_SHARDS ShardConsumerState = iota + 1 - - // This state is responsible for initializing the record processor with the shard information. - INITIALIZING - - // - PROCESSING - - SHUTDOWN_REQUESTED - - SHUTTING_DOWN - - SHUTDOWN_COMPLETE + WaitingOnParentShards ShardConsumerState = iota + 1 // ErrCodeKMSThrottlingException is defined in the API Reference https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.GetRecords // But it's not a constant? @@ -215,9 +204,21 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { // reset the retry count after success retriedErrors = 0 + log.Debugf("Received %d original records.", len(getResp.Records)) + + // De-aggregate the records if they were published by the KPL. + dars := make([]*kinesis.Record, 0) + dars, err = deagg.DeaggregateRecords(getResp.Records) + + if err != nil { + // The error is caused by bad KPL publisher and just skip the bad records + // instead of being stuck here. + log.Errorf("Error in de-aggregating KPL records: %+v", err) + } + // IRecordProcessorCheckpointer input := &kcl.ProcessRecordsInput{ - Records: getResp.Records, + Records: dars, MillisBehindLatest: aws.Int64Value(getResp.MillisBehindLatest), Checkpointer: recordCheckpointer, } @@ -226,7 +227,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { recordBytes := int64(0) log.Debugf("Received %d records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest) - for _, r := range getResp.Records { + for _, r := range dars { recordBytes += int64(len(r.Data)) } @@ -234,6 +235,8 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { processRecordsStartTime := time.Now() // Delivery the events to the record processor + input.CacheEntryTime = &getRecordsStartTime + input.CacheExitTime = &processRecordsStartTime sc.recordProcessor.ProcessRecords(input) // Convert from nanoseconds to milliseconds @@ -288,7 +291,7 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error { } // Parent shard is finished. - if pshard.Checkpoint == chk.SHARD_END { + if pshard.Checkpoint == chk.ShardEnd { return nil } diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 32273c0..4b42847 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -215,7 +215,7 @@ func (w *Worker) newShardConsumer(shard *par.ShardStatus) *ShardConsumer { consumerID: w.workerID, stop: w.stop, mService: w.mService, - state: WAITING_ON_PARENT_SHARDS, + state: WaitingOnParentShards, } } @@ -246,7 +246,7 @@ func (w *Worker) eventLoop() { // Count the number of leases hold by this worker excluding the processed shard counter := 0 for _, shard := range w.shardStatus { - if shard.GetLeaseOwner() == w.workerID && shard.Checkpoint != chk.SHARD_END { + if shard.GetLeaseOwner() == w.workerID && shard.Checkpoint != chk.ShardEnd { counter++ } } @@ -270,7 +270,7 @@ func (w *Worker) eventLoop() { } // The shard is closed and we have processed all records - if shard.Checkpoint == chk.SHARD_END { + if shard.Checkpoint == chk.ShardEnd { continue } diff --git a/go.mod b/go.mod index 84de9ab..c8fa96f 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/vmware/vmware-go-kcl require ( github.com/BurntSushi/toml v0.3.1 // indirect github.com/aws/aws-sdk-go v1.34.8 + github.com/awslabs/kinesis-aggregation/go v0.0.0-20201211133042-142dfe1d7a6d + github.com/golang/protobuf v1.3.1 github.com/google/uuid v1.1.1 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/prometheus/client_golang v0.9.3 diff --git a/go.sum b/go.sum index d6c3fa2..60c4332 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,11 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.8 h1:GDfVeXG8XQDbpOeAj7415F8qCQZwvY/k/fj+HBqUnBA= github.com/aws/aws-sdk-go v1.34.8/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/awslabs/kinesis-aggregation/go v0.0.0-20201211133042-142dfe1d7a6d h1:kGtsYh3+yYsCafn/pp/j/SMbc2bOiWJBxxkzCnAQWF4= +github.com/awslabs/kinesis-aggregation/go v0.0.0-20201211133042-142dfe1d7a6d/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -18,12 +21,14 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -63,6 +68,7 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= diff --git a/test/record_processor_test.go b/test/record_processor_test.go index c20b901..f392efb 100644 --- a/test/record_processor_test.go +++ b/test/record_processor_test.go @@ -67,10 +67,12 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { dd.count++ } - // checkpoint it after processing this batch - lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber - dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNubmer, input.MillisBehindLatest) - input.Checkpointer.Checkpoint(lastRecordSequenceNubmer) + // checkpoint it after processing this batch. + // Especially, for processing de-aggregated KPL records, checkpointing has to happen at the end of batch + // because de-aggregated records share the same sequence number. + lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber + dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNumber, input.MillisBehindLatest) + input.Checkpointer.Checkpoint(lastRecordSequenceNumber) } func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) { diff --git a/test/record_publisher_test.go b/test/record_publisher_test.go index cb61f83..f948fc1 100644 --- a/test/record_publisher_test.go +++ b/test/record_publisher_test.go @@ -19,12 +19,17 @@ package test import ( + "crypto/md5" + "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" + rec "github.com/awslabs/kinesis-aggregation/go/records" + "github.com/golang/protobuf/proto" "github.com/vmware/vmware-go-kcl/clientlibrary/utils" + "testing" ) @@ -60,6 +65,13 @@ func publishSomeData(t *testing.T, kc kinesisiface.KinesisAPI) { publishRecords(t, kc) } t.Log("Done putting data into stream using PutRecords API.") + + // Put some data into stream using KPL Aggregate Record format + t.Log("Putting data into stream using KPL Aggregate Record ...") + for i := 0; i < 10; i++ { + publishAggregateRecord(t, kc) + } + t.Log("Done putting data into stream using KPL Aggregate Record.") } // publishRecord to put a record into Kinesis stream using PutRecord API. @@ -97,3 +109,55 @@ func publishRecords(t *testing.T, kc kinesisiface.KinesisAPI) { t.Errorf("Error in PutRecords. %+v", err) } } + +// publishRecord to put a record into Kinesis stream using PutRecord API. +func publishAggregateRecord(t *testing.T, kc kinesisiface.KinesisAPI) { + data := generateAggregateRecord(5, specstr) + // Use random string as partition key to ensure even distribution across shards + _, err := kc.PutRecord(&kinesis.PutRecordInput{ + Data: data, + StreamName: aws.String(streamName), + PartitionKey: aws.String(utils.RandStringBytesMaskImpr(10)), + }) + + if err != nil { + t.Errorf("Error in PutRecord. %+v", err) + } +} + +// generateAggregateRecord generates an aggregate record in the correct AWS-specified format used by KPL. +// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md +// copy from: https://github.com/awslabs/kinesis-aggregation/blob/master/go/deaggregator/deaggregator_test.go +func generateAggregateRecord(numRecords int, content string) []byte { + aggr := &rec.AggregatedRecord{} + // Start with the magic header + aggRecord := []byte("\xf3\x89\x9a\xc2") + partKeyTable := make([]string, 0) + + // Create proto record with numRecords length + for i := 0; i < numRecords; i++ { + var partKey uint64 + var hashKey uint64 + partKey = uint64(i) + hashKey = uint64(i) * uint64(10) + r := &rec.Record{ + PartitionKeyIndex: &partKey, + ExplicitHashKeyIndex: &hashKey, + Data: []byte(content), + Tags: make([]*rec.Tag, 0), + } + + aggr.Records = append(aggr.Records, r) + partKeyVal := fmt.Sprint(i) + partKeyTable = append(partKeyTable, partKeyVal) + } + + aggr.PartitionKeyTable = partKeyTable + // Marshal to protobuf record, create md5 sum from proto record + // and append both to aggRecord with magic header + data, _ := proto.Marshal(aggr) + md5Hash := md5.Sum(data) + aggRecord = append(aggRecord, data...) + aggRecord = append(aggRecord, md5Hash[:]...) + return aggRecord +} diff --git a/test/worker_test.go b/test/worker_test.go index bbe5e51..e79f115 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -69,7 +69,7 @@ func TestWorker(t *testing.T) { kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). - WithMaxRecords(10). + WithMaxRecords(8). WithMaxLeasesForWorker(1). WithShardSyncIntervalMillis(5000). WithFailoverTimeMillis(300000).