Support Kinesis aggregation format (#84)

Add support for Kinesis aggregation format to consume record
published by KPL.

Note: current implementation need to checkpoint the whole batch
of the de-aggregated records instead of just portion of them.

Add cache entry and exit time.

Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
Tao Jiang 2020-12-23 09:47:47 -06:00
parent 6ff3cd1b15
commit 1044485392
14 changed files with 166 additions and 247 deletions

View file

@ -33,14 +33,14 @@ import (
)
const (
LEASE_KEY_KEY = "ShardID"
LEASE_OWNER_KEY = "AssignedTo"
LEASE_TIMEOUT_KEY = "LeaseTimeout"
CHECKPOINT_SEQUENCE_NUMBER_KEY = "Checkpoint"
PARENT_SHARD_ID_KEY = "ParentShardId"
LeaseKeyKey = "ShardID"
LeaseOwnerKey = "AssignedTo"
LeaseTimeoutKey = "LeaseTimeout"
SequenceNumberKey = "Checkpoint"
ParentShardIdKey = "ParentShardId"
// We've completely processed all records in this shard.
SHARD_END = "SHARD_END"
ShardEnd = "SHARD_END"
// ErrLeaseNotAquired is returned when we failed to get a lock on the shard
ErrLeaseNotAquired = "Lease is already held by another node"

View file

@ -125,8 +125,8 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
return err
}
assignedVar, assignedToOk := currentCheckpoint[LEASE_OWNER_KEY]
leaseVar, leaseTimeoutOk := currentCheckpoint[LEASE_TIMEOUT_KEY]
assignedVar, assignedToOk := currentCheckpoint[LeaseOwnerKey]
leaseVar, leaseTimeoutOk := currentCheckpoint[LeaseTimeoutKey]
var conditionalExpression string
var expressionAttributeValues map[string]*dynamodb.AttributeValue
@ -161,23 +161,23 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
}
marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
LeaseKeyKey: {
S: aws.String(shard.ID),
},
LEASE_OWNER_KEY: {
LeaseOwnerKey: {
S: aws.String(newAssignTo),
},
LEASE_TIMEOUT_KEY: {
LeaseTimeoutKey: {
S: aws.String(newLeaseTimeoutString),
},
}
if len(shard.ParentShardId) > 0 {
marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: aws.String(shard.ParentShardId)}
marshalledCheckpoint[ParentShardIdKey] = &dynamodb.AttributeValue{S: aws.String(shard.ParentShardId)}
}
if shard.Checkpoint != "" {
marshalledCheckpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY] = &dynamodb.AttributeValue{
marshalledCheckpoint[SequenceNumberKey] = &dynamodb.AttributeValue{
S: aws.String(shard.Checkpoint),
}
}
@ -204,22 +204,22 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error {
leaseTimeout := shard.LeaseTimeout.UTC().Format(time.RFC3339)
marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
LeaseKeyKey: {
S: aws.String(shard.ID),
},
CHECKPOINT_SEQUENCE_NUMBER_KEY: {
SequenceNumberKey: {
S: aws.String(shard.Checkpoint),
},
LEASE_OWNER_KEY: {
LeaseOwnerKey: {
S: aws.String(shard.AssignedTo),
},
LEASE_TIMEOUT_KEY: {
LeaseTimeoutKey: {
S: aws.String(leaseTimeout),
},
}
if len(shard.ParentShardId) > 0 {
marshalledCheckpoint[PARENT_SHARD_ID_KEY] = &dynamodb.AttributeValue{S: &shard.ParentShardId}
marshalledCheckpoint[ParentShardIdKey] = &dynamodb.AttributeValue{S: &shard.ParentShardId}
}
return checkpointer.saveItem(marshalledCheckpoint)
@ -232,7 +232,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
return err
}
sequenceID, ok := checkpoint[CHECKPOINT_SEQUENCE_NUMBER_KEY]
sequenceID, ok := checkpoint[SequenceNumberKey]
if !ok {
return ErrSequenceIDNotFound
}
@ -241,7 +241,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
defer shard.Mux.Unlock()
shard.Checkpoint = aws.StringValue(sequenceID.S)
if assignedTo, ok := checkpoint[LEASE_OWNER_KEY]; ok {
if assignedTo, ok := checkpoint[LeaseOwnerKey]; ok {
shard.AssignedTo = aws.StringValue(assignedTo.S)
}
return nil
@ -265,11 +265,11 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
input := &dynamodb.UpdateItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
LeaseKeyKey: {
S: aws.String(shardID),
},
},
UpdateExpression: aws.String("remove " + LEASE_OWNER_KEY),
UpdateExpression: aws.String("remove " + LeaseOwnerKey),
}
_, err := checkpointer.svc.UpdateItem(input)
@ -281,13 +281,13 @@ func (checkpointer *DynamoCheckpoint) createTable() error {
input := &dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String(LEASE_KEY_KEY),
AttributeName: aws.String(LeaseKeyKey),
AttributeType: aws.String("S"),
},
},
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String(LEASE_KEY_KEY),
AttributeName: aws.String(LeaseKeyKey),
KeyType: aws.String("HASH"),
},
},
@ -334,7 +334,7 @@ func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynam
item, err := checkpointer.svc.GetItem(&dynamodb.GetItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
LeaseKeyKey: {
S: aws.String(shardID),
},
},
@ -346,7 +346,7 @@ func (checkpointer *DynamoCheckpoint) removeItem(shardID string) error {
_, err := checkpointer.svc.DeleteItem(&dynamodb.DeleteItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]*dynamodb.AttributeValue{
LEASE_KEY_KEY: {
LeaseKeyKey: {
S: aws.String(shardID),
},
},

View file

@ -131,7 +131,7 @@ func TestGetLeaseAquired(t *testing.T) {
t.Errorf("Lease not aquired after timeout %s", err)
}
id, ok := svc.item[CHECKPOINT_SEQUENCE_NUMBER_KEY]
id, ok := svc.item[SequenceNumberKey]
if !ok {
t.Error("Expected checkpoint to be set by GetLease")
} else if *id.S != "deadbeef" {
@ -172,24 +172,24 @@ func (m *mockDynamoDB) DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.De
func (m *mockDynamoDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) {
item := input.Item
if shardID, ok := item[LEASE_KEY_KEY]; ok {
m.item[LEASE_KEY_KEY] = shardID
if shardID, ok := item[LeaseKeyKey]; ok {
m.item[LeaseKeyKey] = shardID
}
if owner, ok := item[LEASE_OWNER_KEY]; ok {
m.item[LEASE_OWNER_KEY] = owner
if owner, ok := item[LeaseOwnerKey]; ok {
m.item[LeaseOwnerKey] = owner
}
if timeout, ok := item[LEASE_TIMEOUT_KEY]; ok {
m.item[LEASE_TIMEOUT_KEY] = timeout
if timeout, ok := item[LeaseTimeoutKey]; ok {
m.item[LeaseTimeoutKey] = timeout
}
if checkpoint, ok := item[CHECKPOINT_SEQUENCE_NUMBER_KEY]; ok {
m.item[CHECKPOINT_SEQUENCE_NUMBER_KEY] = checkpoint
if checkpoint, ok := item[SequenceNumberKey]; ok {
m.item[SequenceNumberKey] = checkpoint
}
if parent, ok := item[PARENT_SHARD_ID_KEY]; ok {
m.item[PARENT_SHARD_ID_KEY] = parent
if parent, ok := item[ParentShardIdKey]; ok {
m.item[ParentShardIdKey] = parent
}
return nil, nil
@ -204,8 +204,8 @@ func (m *mockDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemO
func (m *mockDynamoDB) UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) {
exp := input.UpdateExpression
if aws.StringValue(exp) == "remove "+LEASE_OWNER_KEY {
delete(m.item, LEASE_OWNER_KEY)
if aws.StringValue(exp) == "remove "+LeaseOwnerKey {
delete(m.item, LeaseOwnerKey)
}
return nil, nil

View file

@ -1,165 +0,0 @@
/*
* Copyright (c) 2018 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package common
import (
"fmt"
"net/http"
)
// ErrorCode is unified definition of numerical error codes
type ErrorCode int32
// pre-defined error codes
const (
// System Wide 41000 - 42000
KinesisClientLibError ErrorCode = 41000
// KinesisClientLibrary Retryable Errors 41001 - 41100
KinesisClientLibRetryableError ErrorCode = 41001
KinesisClientLibIOError ErrorCode = 41002
BlockedOnParentShardError ErrorCode = 41003
KinesisClientLibDependencyError ErrorCode = 41004
ThrottlingError ErrorCode = 41005
// KinesisClientLibrary NonRetryable Errors 41100 - 41200
KinesisClientLibNonRetryableException ErrorCode = 41100
InvalidStateError ErrorCode = 41101
ShutdownError ErrorCode = 41102
// Kinesis Lease Errors 41200 - 41300
LeasingError ErrorCode = 41200
LeasingInvalidStateError ErrorCode = 41201
LeasingDependencyError ErrorCode = 41202
LeasingProvisionedThroughputError ErrorCode = 41203
// Misc Errors 41300 - 41400
// NotImplemented
KinesisClientLibNotImplemented ErrorCode = 41301
// Error indicates passing illegal or inappropriate argument
IllegalArgumentError ErrorCode = 41302
)
var errorMap = map[ErrorCode]ClientLibraryError{
KinesisClientLibError: {ErrorCode: KinesisClientLibError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Top level error of Kinesis Client Library"},
// Retryable
KinesisClientLibRetryableError: {ErrorCode: KinesisClientLibRetryableError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Retryable exceptions (e.g. transient errors). The request/operation is expected to succeed upon (back off and) retry."},
KinesisClientLibIOError: {ErrorCode: KinesisClientLibIOError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in reading/writing information (e.g. shard information from Kinesis may not be current/complete)."},
BlockedOnParentShardError: {ErrorCode: BlockedOnParentShardError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Cannot start processing data for a shard because the data from the parent shard has not been completely processed (yet)."},
KinesisClientLibDependencyError: {ErrorCode: KinesisClientLibDependencyError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Cannot talk to its dependencies (e.g. fetching data from Kinesis, DynamoDB table reads/writes)."},
ThrottlingError: {ErrorCode: ThrottlingError, Retryable: true, Status: http.StatusTooManyRequests, Msg: "Requests are throttled by a service (e.g. DynamoDB when storing a checkpoint)."},
// Non-Retryable
KinesisClientLibNonRetryableException: {ErrorCode: KinesisClientLibNonRetryableException, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Non-retryable exceptions. Simply retrying the same request/operation is not expected to succeed."},
InvalidStateError: {ErrorCode: InvalidStateError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Kinesis Library has issues with internal state (e.g. DynamoDB table is not found)."},
ShutdownError: {ErrorCode: ShutdownError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "The RecordProcessor instance has been shutdown (e.g. and attempts a checkpiont)."},
// Leasing
LeasingError: {ErrorCode: LeasingError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Top-level error type for the leasing code."},
LeasingInvalidStateError: {ErrorCode: LeasingInvalidStateError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed because DynamoDB is an invalid state"},
LeasingDependencyError: {ErrorCode: LeasingDependencyError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed because a dependency of the leasing system has failed."},
LeasingProvisionedThroughputError: {ErrorCode: LeasingProvisionedThroughputError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed due to lack of provisioned throughput for a DynamoDB table."},
// IllegalArgumentError
IllegalArgumentError: {ErrorCode: IllegalArgumentError, Retryable: false, Status: http.StatusBadRequest, Msg: "Error indicates that a method has been passed an illegal or inappropriate argument."},
// Not Implemented
KinesisClientLibNotImplemented: {ErrorCode: KinesisClientLibNotImplemented, Retryable: false, Status: http.StatusNotImplemented, Msg: "Not Implemented"},
}
// Message returns the message of the error code
func (c ErrorCode) Message() string {
return errorMap[c].Msg
}
// MakeErr makes an error with default message
func (c ErrorCode) MakeErr() *ClientLibraryError {
e := errorMap[c]
return &e
}
// MakeError makes an error with message and data
func (c ErrorCode) MakeError(detail string) error {
e := errorMap[c]
return e.WithDetail(detail)
}
// ClientLibraryError is unified error
type ClientLibraryError struct {
// ErrorCode is the numerical error code.
ErrorCode `json:"code"`
// Retryable is a bool flag to indicate the whether the error is retryable or not.
Retryable bool `json:"tryable"`
// Status is the HTTP status code.
Status int `json:"status"`
// Msg provides a terse description of the error. Its value is defined in errorMap.
Msg string `json:"msg"`
// Detail provides a detailed description of the error. Its value is set using WithDetail.
Detail string `json:"detail"`
}
// Error implements error
func (e *ClientLibraryError) Error() string {
var prefix string
if e.Retryable {
prefix = "Retryable"
} else {
prefix = "NonRetryable"
}
msg := fmt.Sprintf("%v Error [%d]: %s", prefix, int32(e.ErrorCode), e.Msg)
if e.Detail != "" {
msg = fmt.Sprintf("%s, detail: %s", msg, e.Detail)
}
return msg
}
// WithMsg overwrites the default error message
func (e *ClientLibraryError) WithMsg(format string, v ...interface{}) *ClientLibraryError {
e.Msg = fmt.Sprintf(format, v...)
return e
}
// WithDetail adds a detailed message to error
func (e *ClientLibraryError) WithDetail(format string, v ...interface{}) *ClientLibraryError {
if len(e.Detail) == 0 {
e.Detail = fmt.Sprintf(format, v...)
} else {
e.Detail += ", " + fmt.Sprintf(format, v...)
}
return e
}
// WithCause adds CauseBy to error
func (e *ClientLibraryError) WithCause(err error) *ClientLibraryError {
if err != nil {
// Store error message in Detail, so the info can be preserved
// when CascadeError is marshaled to json.
if len(e.Detail) == 0 {
e.Detail = err.Error()
} else {
e.Detail += ", cause: " + err.Error()
}
}
return e
}

View file

@ -121,15 +121,6 @@ const (
// The amount of milliseconds to wait before graceful shutdown forcefully terminates.
DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000
// The size of the thread pool to create for the lease renewer to use.
DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20
// The sleep time between two listShards calls from the proxy when throttled.
DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS = 1500
// The number of times the Proxy will retry listShards call when throttled.
DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50
)
type (

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 VMware, Inc.
* Copyright (c) 2020 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
@ -48,6 +48,7 @@ const (
* instead depend on a different interface for backward compatibility.
*/
REQUESTED ShutdownReason = iota + 1
/**
* Terminate processing for this RecordProcessor (resharding use case).
* Indicates that the shard is closed and all records from the shard have been delivered to the application.
@ -55,6 +56,7 @@ const (
* from this shard and processing of child shards can be started.
*/
TERMINATE
/**
* Processing will be moved to a different record processor (fail over, load balancing use cases).
* Applications SHOULD NOT checkpoint their progress (as another record processor may have already started
@ -76,22 +78,36 @@ type (
ShutdownReason int
InitializationInput struct {
ShardId string
ExtendedSequenceNumber *ExtendedSequenceNumber
PendingCheckpointSequenceNumber *ExtendedSequenceNumber
// The shardId that the record processor is being initialized for.
ShardId string
// The last extended sequence number that was successfully checkpointed by the previous record processor.
ExtendedSequenceNumber *ExtendedSequenceNumber
}
ProcessRecordsInput struct {
CacheEntryTime *time.Time
CacheExitTime *time.Time
Records []*ks.Record
Checkpointer IRecordProcessorCheckpointer
// The time that this batch of records was received by the KCL.
CacheEntryTime *time.Time
// The time that this batch of records was prepared to be provided to the RecordProcessor.
CacheExitTime *time.Time
// The records received from Kinesis. These records may have been de-aggregated if they were published by the KPL.
Records []*ks.Record
// A checkpointer that the RecordProcessor can use to checkpoint its progress.
Checkpointer IRecordProcessorCheckpointer
// How far behind this batch of records was when received from Kinesis.
MillisBehindLatest int64
}
ShutdownInput struct {
// ShutdownReason shows why RecordProcessor is going to be shutdown.
ShutdownReason ShutdownReason
Checkpointer IRecordProcessorCheckpointer
// Checkpointer is used to record the current progress.
Checkpointer IRecordProcessorCheckpointer
}
)

View file

@ -69,7 +69,7 @@ func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error
// checkpoint the last sequence of a closed shard
if sequenceNumber == nil {
rc.shard.Checkpoint = chk.SHARD_END
rc.shard.Checkpoint = chk.ShardEnd
} else {
rc.shard.Checkpoint = aws.StringValue(sequenceNumber)
}

View file

@ -36,6 +36,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
deagg "github.com/awslabs/kinesis-aggregation/go/deaggregator"
chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint"
"github.com/vmware/vmware-go-kcl/clientlibrary/config"
@ -47,19 +48,7 @@ import (
const (
// This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all
// parent shards have been completed.
WAITING_ON_PARENT_SHARDS ShardConsumerState = iota + 1
// This state is responsible for initializing the record processor with the shard information.
INITIALIZING
//
PROCESSING
SHUTDOWN_REQUESTED
SHUTTING_DOWN
SHUTDOWN_COMPLETE
WaitingOnParentShards ShardConsumerState = iota + 1
// ErrCodeKMSThrottlingException is defined in the API Reference https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.GetRecords
// But it's not a constant?
@ -215,9 +204,21 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
// reset the retry count after success
retriedErrors = 0
log.Debugf("Received %d original records.", len(getResp.Records))
// De-aggregate the records if they were published by the KPL.
dars := make([]*kinesis.Record, 0)
dars, err = deagg.DeaggregateRecords(getResp.Records)
if err != nil {
// The error is caused by bad KPL publisher and just skip the bad records
// instead of being stuck here.
log.Errorf("Error in de-aggregating KPL records: %+v", err)
}
// IRecordProcessorCheckpointer
input := &kcl.ProcessRecordsInput{
Records: getResp.Records,
Records: dars,
MillisBehindLatest: aws.Int64Value(getResp.MillisBehindLatest),
Checkpointer: recordCheckpointer,
}
@ -226,7 +227,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
recordBytes := int64(0)
log.Debugf("Received %d records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest)
for _, r := range getResp.Records {
for _, r := range dars {
recordBytes += int64(len(r.Data))
}
@ -234,6 +235,8 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
processRecordsStartTime := time.Now()
// Delivery the events to the record processor
input.CacheEntryTime = &getRecordsStartTime
input.CacheExitTime = &processRecordsStartTime
sc.recordProcessor.ProcessRecords(input)
// Convert from nanoseconds to milliseconds
@ -288,7 +291,7 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error {
}
// Parent shard is finished.
if pshard.Checkpoint == chk.SHARD_END {
if pshard.Checkpoint == chk.ShardEnd {
return nil
}

View file

@ -215,7 +215,7 @@ func (w *Worker) newShardConsumer(shard *par.ShardStatus) *ShardConsumer {
consumerID: w.workerID,
stop: w.stop,
mService: w.mService,
state: WAITING_ON_PARENT_SHARDS,
state: WaitingOnParentShards,
}
}
@ -246,7 +246,7 @@ func (w *Worker) eventLoop() {
// Count the number of leases hold by this worker excluding the processed shard
counter := 0
for _, shard := range w.shardStatus {
if shard.GetLeaseOwner() == w.workerID && shard.Checkpoint != chk.SHARD_END {
if shard.GetLeaseOwner() == w.workerID && shard.Checkpoint != chk.ShardEnd {
counter++
}
}
@ -270,7 +270,7 @@ func (w *Worker) eventLoop() {
}
// The shard is closed and we have processed all records
if shard.Checkpoint == chk.SHARD_END {
if shard.Checkpoint == chk.ShardEnd {
continue
}

2
go.mod
View file

@ -3,6 +3,8 @@ module github.com/vmware/vmware-go-kcl
require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/aws/aws-sdk-go v1.34.8
github.com/awslabs/kinesis-aggregation/go v0.0.0-20201211133042-142dfe1d7a6d
github.com/golang/protobuf v1.3.1
github.com/google/uuid v1.1.1
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/prometheus/client_golang v0.9.3

6
go.sum
View file

@ -3,8 +3,11 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.34.8 h1:GDfVeXG8XQDbpOeAj7415F8qCQZwvY/k/fj+HBqUnBA=
github.com/aws/aws-sdk-go v1.34.8/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20201211133042-142dfe1d7a6d h1:kGtsYh3+yYsCafn/pp/j/SMbc2bOiWJBxxkzCnAQWF4=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20201211133042-142dfe1d7a6d/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@ -18,12 +21,14 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
@ -63,6 +68,7 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=

View file

@ -67,10 +67,12 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
dd.count++
}
// checkpoint it after processing this batch
lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber
dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNubmer, input.MillisBehindLatest)
input.Checkpointer.Checkpoint(lastRecordSequenceNubmer)
// checkpoint it after processing this batch.
// Especially, for processing de-aggregated KPL records, checkpointing has to happen at the end of batch
// because de-aggregated records share the same sequence number.
lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber
dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNumber, input.MillisBehindLatest)
input.Checkpointer.Checkpoint(lastRecordSequenceNumber)
}
func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) {

View file

@ -19,12 +19,17 @@
package test
import (
"crypto/md5"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
rec "github.com/awslabs/kinesis-aggregation/go/records"
"github.com/golang/protobuf/proto"
"github.com/vmware/vmware-go-kcl/clientlibrary/utils"
"testing"
)
@ -60,6 +65,13 @@ func publishSomeData(t *testing.T, kc kinesisiface.KinesisAPI) {
publishRecords(t, kc)
}
t.Log("Done putting data into stream using PutRecords API.")
// Put some data into stream using KPL Aggregate Record format
t.Log("Putting data into stream using KPL Aggregate Record ...")
for i := 0; i < 10; i++ {
publishAggregateRecord(t, kc)
}
t.Log("Done putting data into stream using KPL Aggregate Record.")
}
// publishRecord to put a record into Kinesis stream using PutRecord API.
@ -97,3 +109,55 @@ func publishRecords(t *testing.T, kc kinesisiface.KinesisAPI) {
t.Errorf("Error in PutRecords. %+v", err)
}
}
// publishRecord to put a record into Kinesis stream using PutRecord API.
func publishAggregateRecord(t *testing.T, kc kinesisiface.KinesisAPI) {
data := generateAggregateRecord(5, specstr)
// Use random string as partition key to ensure even distribution across shards
_, err := kc.PutRecord(&kinesis.PutRecordInput{
Data: data,
StreamName: aws.String(streamName),
PartitionKey: aws.String(utils.RandStringBytesMaskImpr(10)),
})
if err != nil {
t.Errorf("Error in PutRecord. %+v", err)
}
}
// generateAggregateRecord generates an aggregate record in the correct AWS-specified format used by KPL.
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
// copy from: https://github.com/awslabs/kinesis-aggregation/blob/master/go/deaggregator/deaggregator_test.go
func generateAggregateRecord(numRecords int, content string) []byte {
aggr := &rec.AggregatedRecord{}
// Start with the magic header
aggRecord := []byte("\xf3\x89\x9a\xc2")
partKeyTable := make([]string, 0)
// Create proto record with numRecords length
for i := 0; i < numRecords; i++ {
var partKey uint64
var hashKey uint64
partKey = uint64(i)
hashKey = uint64(i) * uint64(10)
r := &rec.Record{
PartitionKeyIndex: &partKey,
ExplicitHashKeyIndex: &hashKey,
Data: []byte(content),
Tags: make([]*rec.Tag, 0),
}
aggr.Records = append(aggr.Records, r)
partKeyVal := fmt.Sprint(i)
partKeyTable = append(partKeyTable, partKeyVal)
}
aggr.PartitionKeyTable = partKeyTable
// Marshal to protobuf record, create md5 sum from proto record
// and append both to aggRecord with magic header
data, _ := proto.Marshal(aggr)
md5Hash := md5.Sum(data)
aggRecord = append(aggRecord, data...)
aggRecord = append(aggRecord, md5Hash[:]...)
return aggRecord
}

View file

@ -69,7 +69,7 @@ func TestWorker(t *testing.T) {
kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10).
WithMaxRecords(8).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000).