diff --git a/Makefile b/Makefile index ef02f13..317f57c 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ build: build-common ## - build a debug binary to the current platform (windows, .PHONY: format-check format-check: ## - check files format using gofmt - @ ./_support/scripts/ci.sh fmtcheck + @ ./_support/scripts/ci.sh fmtCheck .PHONY: format-check format: ## - apply golang file format using gofmt @@ -30,7 +30,7 @@ format: ## - apply golang file format using gofmt .PHONY: test test: build-common ## - execute go test command for unit and mocked tests - @ ./_support/scripts/ci.sh unittest + @ ./_support/scripts/ci.sh unitTest .PHONY: integration-test integration-test: ## - execute go test command for integration tests (aws credentials needed) diff --git a/README.md b/README.md index 203c19c..8a52a1f 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,22 @@ # VMWare Go KCL v2 ![technology Go](https://img.shields.io/badge/technology-go-blue.svg) -[![Go Report Card](https://goreportcard.com/badge/github.com/vmware/vmware-go-kcl)](https://goreportcard.com/report/github.com/vmware/vmware-go-kcl) +[![Go Report Card](https://goreportcard.com/badge/github.com/vmware/vmware-go-kcl-v2)](https://goreportcard.com/report/github.com/vmware/vmware-go-kcl-v2) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) -[![vmware-go-kcl-v2](https://github.com/fafg/vmware-go-kcl/actions/workflows/vmware-go-kcl-v2-ci.yml/badge.svg)](https://github.com/fafg/vmware-go-kcl/actions/workflows/vmware-go-kcl-v2-ci.yml) +[![vmware-go-kcl-v2](https://github.com/fafg/vmware-go-kcl-v2/actions/workflows/vmware-go-kcl-v2-ci.yml/badge.svg)](https://github.com/fafg/vmware-go-kcl-v2/actions/workflows/vmware-go-kcl-v2-ci.yml) ## Overview -VMware-Go-KCL is a native open-source Go library for Amazon Kinesis Data Stream (KDS) consumption. It allows developers -to program KDS consumers in lightweight Go language and still take advantage of the features presented by the native +VMware-Go-KCL-V2 is a native open-source Go library for Amazon Kinesis Data Stream (KDS) consumption. It allows developers +to program KDS consumers in lightweight Go language and still take advantage of the features presented by the native KDS Java API libraries. -[vmware-go-kcl](https://github.com/vmware/vmware-go-kcl) is a VMWare originated open-source project for AWS Kinesis -Client Library in Go. Within VMware, we have seen adoption in vSecureState and Carbon Black. In addition, Carbon Black -has contributed to the vmware-go-kcl codebase and heavily used it in the product. Besides, -[vmware-go-kcl](https://github.com/vmware/vmware-go-kcl) has got -[recognition](https://www.linkedin.com/posts/adityakrish_vmware-go-kcl-a-native-open-source-go-programming-activity-6810626798133616640-B6W8/), -and [contributions](https://github.com/vmware/vmware-go-kcl/graphs/contributors) from the industry. +[vmware-go-kcl-v2](https://github.com/vmware/vmware-go-kcl-v2) is a VMWare originated open-source project for AWS Kinesis +Client Library in Go. Within VMware, we have seen adoption in vSecureState and Carbon Black. In addition, Carbon Black +has contributed to the vmware-go-kcl codebase and heavily used it in the product. Besides, +[vmware-go-kcl-v2](https://github.com/vmware/vmware-go-kcl-v2) has got +[recognition](https://www.linkedin.com/posts/adityakrish_vmware-go-kcl-a-native-open-source-go-programming-activity-6810626798133616640-B6W8/), +and [contributions](https://github.com/vmware/vmware-go-kcl-v2/graphs/contributors) from the industry. `vmware-go-kcl-v2` is the v2 version of VMWare KCL for the Go programming language by utilizing [AWS Go SDK V2](https://github.com/aws/aws-sdk-go-v2). @@ -31,21 +31,20 @@ and [contributions](https://github.com/vmware/vmware-go-kcl/graphs/contributors) ### Build & Run 1. Initialize Project -2. Build -`make build` +2. Build + > `make build` 3. Test - -`make test` + > `make test` ## Documentation VMware-Go-KCL matches exactly the same interface and programming model from original Amazon KCL, the best place for getting reference, tutorial is from Amazon itself: -- [Developing Consumers Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html) -- [Troubleshooting](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html) -- [Advanced Topics](https://docs.aws.amazon.com/streams/latest/dev/advanced-consumers.html) +* [Developing Consumers Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html) +* [Troubleshooting](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html) +* [Advanced Topics](https://docs.aws.amazon.com/streams/latest/dev/advanced-consumers.html) ## Contributing diff --git a/_support/scripts/ci.sh b/_support/scripts/ci.sh index 245802f..fb134fb 100755 --- a/_support/scripts/ci.sh +++ b/_support/scripts/ci.sh @@ -101,7 +101,7 @@ function scanast() { rm -f security.log } -function Scan() { +function scan() { gosec -fmt=sarif -out=results.sarif -exclude-dir=internal -exclude-dir=vendor -severity=high ./... } @@ -117,11 +117,11 @@ function usage() { } case "$1" in - fmtcheck) checkfmt ;; + fmtCheck) checkfmt ;; format) goFormat ;; lint) lint ;; lintDocker) lintDocker ;; - unittest) unitTest ;; + unitTest) unitTest ;; scan) scan ;; localScan) localScan ;; *) usage ;; diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index e5b5250..f64e630 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -90,7 +90,7 @@ func (checkpointer *DynamoCheckpoint) Init() error { checkpointer.log.Infof("Creating DynamoDB session") if checkpointer.svc == nil { - er := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { + resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { if service == dynamodb.ServiceID && len(checkpointer.kclConfig.DynamoDBEndpoint) > 0 { return aws.Endpoint{ PartitionID: "aws", @@ -106,7 +106,7 @@ func (checkpointer *DynamoCheckpoint) Init() error { context.TODO(), awsConfig.WithRegion(checkpointer.kclConfig.RegionName), awsConfig.WithCredentialsProvider(checkpointer.kclConfig.DynamoDBCredentials), - awsConfig.WithEndpointResolver(er), + awsConfig.WithEndpointResolverWithOptions(resolver), awsConfig.WithRetryer(func() aws.Retryer { return retry.AddWithMaxBackoffDelay(retry.NewStandard(), retry.DefaultMaxBackoff) }), diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 76293a3..5b45678 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -43,7 +43,6 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/credentials" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics" "github.com/vmware/vmware-go-kcl-v2/logger" @@ -174,10 +173,10 @@ type ( KinesisEndpoint string // KinesisCredentials is used to access Kinesis - KinesisCredentials *credentials.StaticCredentialsProvider + KinesisCredentials aws.CredentialsProvider // DynamoDBCredentials is used to access DynamoDB - DynamoDBCredentials *credentials.StaticCredentialsProvider + DynamoDBCredentials aws.CredentialsProvider // TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName TableName string diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 656b044..135f3fa 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -40,7 +40,7 @@ import ( "log" "time" - "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/utils" @@ -55,13 +55,13 @@ func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID // NewKinesisClientLibConfigWithCredential creates a default KinesisClientLibConfiguration based on the required fields and unique credentials. func NewKinesisClientLibConfigWithCredential(applicationName, streamName, regionName, workerID string, - creds *credentials.StaticCredentialsProvider) *KinesisClientLibConfiguration { + creds aws.CredentialsProvider) *KinesisClientLibConfiguration { return NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID, creds, creds) } // NewKinesisClientLibConfigWithCredentials creates a default KinesisClientLibConfiguration based on the required fields and specific credentials for each service. func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID string, - kinesisCreds, dynamodbCreds *credentials.StaticCredentialsProvider) *KinesisClientLibConfiguration { + kinesisCreds, dynamodbCreds aws.CredentialsProvider) *KinesisClientLibConfiguration { checkIsValueNotEmpty("ApplicationName", applicationName) checkIsValueNotEmpty("StreamName", streamName) checkIsValueNotEmpty("RegionName", regionName) diff --git a/clientlibrary/metrics/cloudwatch/cloudwatch.go b/clientlibrary/metrics/cloudwatch/cloudwatch.go index e8bf732..1383bbb 100644 --- a/clientlibrary/metrics/cloudwatch/cloudwatch.go +++ b/clientlibrary/metrics/cloudwatch/cloudwatch.go @@ -35,8 +35,6 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/credentials" - cwatch "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" @@ -51,7 +49,7 @@ type MonitoringService struct { streamName string workerID string region string - credentials *credentials.StaticCredentialsProvider + credentials aws.CredentialsProvider logger logger.Logger // control how often to publish to CloudWatch @@ -76,13 +74,13 @@ type cloudWatchMetrics struct { } // NewMonitoringService returns a Monitoring service publishing metrics to CloudWatch. -func NewMonitoringService(region string, creds *credentials.StaticCredentialsProvider) *MonitoringService { +func NewMonitoringService(region string, creds aws.CredentialsProvider) *MonitoringService { return NewMonitoringServiceWithOptions(region, creds, logger.GetDefaultLogger(), DefaultCloudwatchMetricsBufferDuration) } // NewMonitoringServiceWithOptions returns a Monitoring service publishing metrics to // CloudWatch with the provided credentials, buffering duration and logger. -func NewMonitoringServiceWithOptions(region string, creds *credentials.StaticCredentialsProvider, logger logger.Logger, bufferDur time.Duration) *MonitoringService { +func NewMonitoringServiceWithOptions(region string, creds aws.CredentialsProvider, logger logger.Logger, bufferDur time.Duration) *MonitoringService { return &MonitoringService{ region: region, credentials: creds, diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index d384145..32b91b1 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -27,13 +27,13 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + deagg "github.com/awslabs/kinesis-aggregation/go/v2/deaggregator" chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics" par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition" - deagg "github.com/vmware/vmware-go-kcl-v2/internal/deaggregator" ) type shardConsumer interface { diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 75ba41d..7807edd 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -40,7 +40,6 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/retry" awsConfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint" @@ -160,7 +159,7 @@ func (w *Worker) initialize() error { // create session for Kinesis log.Infof("Creating Kinesis client") - resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { + resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ PartitionID: "aws", URL: w.kclConfig.KinesisEndpoint, @@ -171,12 +170,8 @@ func (w *Worker) initialize() error { cfg, err := awsConfig.LoadDefaultConfig( context.TODO(), awsConfig.WithRegion(w.regionName), - awsConfig.WithCredentialsProvider( - credentials.NewStaticCredentialsProvider( - w.kclConfig.KinesisCredentials.Value.AccessKeyID, - w.kclConfig.KinesisCredentials.Value.SecretAccessKey, - w.kclConfig.KinesisCredentials.Value.SessionToken)), - awsConfig.WithEndpointResolver(resolver), + awsConfig.WithCredentialsProvider(w.kclConfig.KinesisCredentials), + awsConfig.WithEndpointResolverWithOptions(resolver), awsConfig.WithRetryer(func() aws.Retryer { return retry.AddWithMaxBackoffDelay(retry.NewStandard(), retry.DefaultMaxBackoff) }), diff --git a/go.mod b/go.mod index dd7f2ef..9a56ba8 100644 --- a/go.mod +++ b/go.mod @@ -3,35 +3,36 @@ module github.com/vmware/vmware-go-kcl-v2 go 1.17 require ( - github.com/aws/aws-sdk-go-v2 v1.11.0 - github.com/aws/aws-sdk-go-v2/config v1.10.0 - github.com/aws/aws-sdk-go-v2/credentials v1.6.0 - github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.10.0 - github.com/aws/aws-sdk-go-v2/service/dynamodb v1.7.0 - github.com/aws/aws-sdk-go-v2/service/kinesis v1.8.0 + github.com/aws/aws-sdk-go-v2 v1.11.2 + github.com/aws/aws-sdk-go-v2/config v1.11.1 + github.com/aws/aws-sdk-go-v2/credentials v1.6.5 + github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 + github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 + github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.32.1 - github.com/rs/zerolog v1.26.0 + github.com/rs/zerolog v1.26.1 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 - go.uber.org/zap v1.19.1 + go.uber.org/zap v1.20.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) require ( github.com/BurntSushi/toml v0.4.1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.3.0 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.0 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.6.0 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.9.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 // indirect github.com/aws/smithy-go v1.9.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -43,7 +44,7 @@ require ( github.com/prometheus/procfs v0.7.3 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect - golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e // indirect + golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index a170946..f03f679 100644 --- a/go.sum +++ b/go.sum @@ -40,40 +40,45 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aws/aws-sdk-go-v2 v1.11.0 h1:HxyD62DyNhCfiFGUHqJ/xITD6rAjJ7Dm/2nLxLmO4Ag= -github.com/aws/aws-sdk-go-v2 v1.11.0/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= +github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= +github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs= +github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM= -github.com/aws/aws-sdk-go-v2/config v1.10.0 h1:4i+/7DmCQCAls5Z61giur0LOPZ3PXFwnSIw7hRamzws= -github.com/aws/aws-sdk-go-v2/config v1.10.0/go.mod h1:xuqoV5etD3N3B8Ts9je4ijgAv6mb+6NiOPFMUhwRcjA= -github.com/aws/aws-sdk-go-v2/credentials v1.6.0 h1:L3O6osQTlzLKRmiTphw2QJuD21EFapWCX4IipiRJhAE= -github.com/aws/aws-sdk-go-v2/credentials v1.6.0/go.mod h1:rQkYdQPDXRrvPLeEuCNwSgtwMzBo9eDGWlTNC69Sh/0= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 h1:OpZjuUy8Jt3CA1WgJgBC5Bz+uOjE5Ppx4NFTRaooUuA= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0/go.mod h1:5E1J3/TTYy6z909QNR0QnXGBpfESYGDqd3O0zqONghU= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0 h1:zY8cNmbBXt3pzjgWgdIbzpQ6qxoCwt+Nx9JbrAf2mbY= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0/go.mod h1:NO3Q5ZTTQtO2xIg2+xTXYDiT7knSejfeDm7WGDaOo0U= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0 h1:Z3aR/OXBnkYK9zXkNkfitHX6SmUBzSsx8VMHbH4Lvhw= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0/go.mod h1:anlUzBoEWglcUxUQwZA7HQOEVEnQALVZsizAapB2hq8= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.0 h1:c10Z7fWxtJCoyc8rv06jdh9xrKnu7bAJiRaKWvTb2mU= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.0/go.mod h1:6oXGy4GLpypD3uCh8wcqztigGgmhLToMfjavgh+VySg= -github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.10.0 h1:MNNV0fi3J5Lxxhx8iDlKdRZJrtBv/0FyganA3nBYe8Q= -github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.10.0/go.mod h1:Oiwhs3Fo9amYOGsJggWBPU6bwa/u0xVpEdOS5HlouPg= -github.com/aws/aws-sdk-go-v2/service/dynamodb v1.7.0 h1:S3X6RWl0TfMxNXsIzz8r3Y6YVA1HWGSx6M345Q3mQ+I= -github.com/aws/aws-sdk-go-v2/service/dynamodb v1.7.0/go.mod h1:Hh0zJ3419ET9xQBeR+y0lHIkObJwAKPbzV9nTZ0yrJ0= +github.com/aws/aws-sdk-go-v2/config v1.11.1 h1:KXSjb7ZMLRtjxClFptukTYibiOqJS9NwBO+9WD3UMto= +github.com/aws/aws-sdk-go-v2/config v1.11.1/go.mod h1:VvfkzUhVtntSg1JfGFMSKS0CyiTZd3NqBxK5af4zsME= +github.com/aws/aws-sdk-go-v2/credentials v1.6.5 h1:ZrsO2js2v4T95rsCIWoAb/ck5+U1kwkizGdZHY+ni3s= +github.com/aws/aws-sdk-go-v2/credentials v1.6.5/go.mod h1:HWSOnsnqVMbLcWUmom6AN1cqhcLzLJ62AObW28CbYbU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 h1:KiN5TPOLrEjbGCvdTQR4t0U4T87vVwALZ5Bg3jpMqPY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2/go.mod h1:dF2F6tXEOgmW5X1ZFO/EPtWrcm7XkW07KNcJUGNtt4s= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 h1:IQup8Q6lorXeiA/rK72PeToWoWK8h7VAPgHNWdSrtgE= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2/go.mod h1:VITe/MdW6EMXPb0o0txu/fsonXbMHUU2OC2Qp7ivU4o= +github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 h1:BcSBoss+CeyRS4TgZKAcR6kcZ0Sb2P+DHs8r8aMlTpQ= +github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0/go.mod h1:eAgmZ4hIzTsTOlAA7yvGJz+RywxZo3KWtGt7J+jAUxU= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 h1:te+nIFwPf5Bi/cZvd9g/+EF0gkJT3c0J/5+NMx0NBZg= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0/go.mod h1:ELltfl9ri0n4sZ/VjPZBgemNMd9mYIpCAuZhc7NP7l4= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY= -github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.0 h1:A2aUh9d38A2ECh76ahOQUdpJFe+Jhjk8qrfV+YbNYGY= -github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.0/go.mod h1:5h2rxfLN22pLTQ1ZoOza87rp2SnN/9UDYdYBQRmIrsE= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 h1:qGZWS/WgiFY+Zgad2u0gwBHpJxz6Ne401JE7iQI1nKs= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0/go.mod h1:Mq6AEc+oEjCUlBuLiK5YwW4shSOAKCQ3tXN0sQeYoBA= -github.com/aws/aws-sdk-go-v2/service/kinesis v1.8.0 h1:Cz26j4wGD1tJ2w/M8iLhaS81AkAGY3gEYRt0xQWjEIs= -github.com/aws/aws-sdk-go-v2/service/kinesis v1.8.0/go.mod h1:QyNCg1xtWFJVL++i6ZyVcwXZCiKTNeXHH9zZu3NHOdU= -github.com/aws/aws-sdk-go-v2/service/sso v1.6.0 h1:JDgKIUZOmLFu/Rv6zXLrVTWCmzA0jcTdvsT8iFIKrAI= -github.com/aws/aws-sdk-go-v2/service/sso v1.6.0/go.mod h1:Q/l0ON1annSU+mc0JybDy1Gy6dnJxIcWjphO6qJPzvM= -github.com/aws/aws-sdk-go-v2/service/sts v1.9.0 h1:rBLCnL8hQ7Sv1S4XCPYgTMI7Uhg81BkvzIiK+/of2zY= -github.com/aws/aws-sdk-go-v2/service/sts v1.9.0/go.mod h1:jLKCFqS+1T4i7HDqCP9GM4Uk75YW1cS0o82LdxpMyOE= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE= +github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g= +github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0= +github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc= +github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA= +github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58= github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -226,8 +231,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.26.0 h1:ORM4ibhEZeTeQlCojCK2kPz1ogAY4bGs4tD+SaAdGaE= -github.com/rs/zerolog v1.26.0/go.mod h1:yBiM87lvSqX8h0Ww4sdzNSkVYZ8dL2xjZJG1lAuGZEo= +github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= +github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -253,19 +258,20 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4= -go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= -go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI= -go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= +go.uber.org/zap v1.20.0 h1:N4oPlghZwYG55MlU6LXk/Zp00FVNE9X9wrYO8CEs4lc= +go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -325,6 +331,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -383,8 +390,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e h1:zeJt6jBtVDK23XK9QXcmG0FvO0elikp0dYZQZOeL1y0= -golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/internal/deaggregator/deaggregator.go b/internal/deaggregator/deaggregator.go deleted file mode 100644 index 10e4dfb..0000000 --- a/internal/deaggregator/deaggregator.go +++ /dev/null @@ -1,96 +0,0 @@ -// Package deaggregator -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package deaggregator - -import ( - "crypto/md5" - "fmt" - - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/golang/protobuf/proto" - - rec "github.com/vmware/vmware-go-kcl-v2/internal/records" -) - -// KplMagicHeader Magic File Header for a KPL Aggregated Record -var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2")) - -const ( - KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking. - DigestSize = 16 // MD5 Message size for protobuf. -) - -// DeaggregateRecords takes an array of Kinesis records and expands any Protobuf -// records within that array, returning an array of all records -func DeaggregateRecords(records []types.Record) ([]types.Record, error) { - var isAggregated bool - allRecords := make([]types.Record, 0) - - for _, record := range records { - isAggregated = true - - var dataMagic string - var decodedDataNoMagic []byte - // Check if record is long enough to have magic file header - if len(record.Data) >= KplMagicLen { - dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen]) - decodedDataNoMagic = record.Data[KplMagicLen:] - } else { - isAggregated = false - } - - // Check if record has KPL Aggregate Record Magic Header and data length - // is correct size - if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize { - isAggregated = false - } - - if isAggregated { - messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:]) - messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize] - - calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData)) - - // Check protobuf MD5 hash matches MD5 sum of record - if messageDigest != calculatedDigest { - isAggregated = false - } else { - aggRecord := &rec.AggregatedRecord{} - err := proto.Unmarshal(messageData, aggRecord) - - if err != nil { - return nil, err - } - - partitionKeys := aggRecord.PartitionKeyTable - - for _, aggrec := range aggRecord.Records { - newRecord := createUserRecord(partitionKeys, aggrec, record) - allRecords = append(allRecords, newRecord) - } - } - } - - if !isAggregated { - allRecords = append(allRecords, record) - } - } - - return allRecords, nil -} - -// createUserRecord takes in the partitionKeys of the aggregated record, the individual -// deaggregated record, and the original aggregated record builds a kinesis.Record and -// returns it -func createUserRecord(partitionKeys []string, aggRec *rec.Record, record types.Record) types.Record { - partitionKey := partitionKeys[*aggRec.PartitionKeyIndex] - - return types.Record{ - ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp, - Data: aggRec.Data, - EncryptionType: record.EncryptionType, - PartitionKey: &partitionKey, - SequenceNumber: record.SequenceNumber, - } -} diff --git a/internal/deaggregator/deaggregator_test.go b/internal/deaggregator/deaggregator_test.go deleted file mode 100644 index 3ba2793..0000000 --- a/internal/deaggregator/deaggregator_test.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package deaggregator_test - -import ( - "crypto/md5" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - - deagg "github.com/vmware/vmware-go-kcl-v2/internal/deaggregator" - rec "github.com/vmware/vmware-go-kcl-v2/internal/records" -) - -// Generate an aggregate record in the correct AWS-specified format -// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md -func generateAggregateRecord(numRecords int) []byte { - - aggr := &rec.AggregatedRecord{} - // Start with the magic header - aggRecord := []byte("\xf3\x89\x9a\xc2") - partKeyTable := make([]string, 0) - - // Create proto record with numRecords length - for i := 0; i < numRecords; i++ { - var partKey uint64 - var hashKey uint64 - partKey = uint64(i) - hashKey = uint64(i) * uint64(10) - r := &rec.Record{ - PartitionKeyIndex: &partKey, - ExplicitHashKeyIndex: &hashKey, - Data: []byte("Some test data string"), - Tags: make([]*rec.Tag, 0), - } - - aggr.Records = append(aggr.Records, r) - partKeyVal := "test" + fmt.Sprint(i) - partKeyTable = append(partKeyTable, partKeyVal) - } - - aggr.PartitionKeyTable = partKeyTable - // Marshal to protobuf record, create md5 sum from proto record - // and append both to aggRecord with magic header - data, _ := proto.Marshal(aggr) - md5Hash := md5.Sum(data) - aggRecord = append(aggRecord, data...) - aggRecord = append(aggRecord, md5Hash[:]...) - return aggRecord -} - -// Generate a generic kinesis.Record using whatever []byte -// is passed in as the data (can be normal []byte or proto record) -func generateKinesisRecord(data []byte) types.Record { - currentTime := time.Now() - encryptionType := types.EncryptionTypeNone - partitionKey := "1234" - sequenceNumber := "21269319989900637946712965403778482371" - return types.Record{ - ApproximateArrivalTimestamp: ¤tTime, - Data: data, - EncryptionType: encryptionType, - PartitionKey: &partitionKey, - SequenceNumber: &sequenceNumber, - } -} - -// This tests to make sure that the data is at least larger than the length -// of the magic header to do some array slicing with index out of bounds -func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - smallByte := []byte("No") - kr = generateKinesisRecord(smallByte) - krs = append(krs, kr) - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // Small byte test, since this is not a deaggregated record, should return 1 - // record in the array. - assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.") -} - -// This function tests to make sure that the data starts with the correct magic header -// according to KPL aggregate documentation. -func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - mismatchAggData := aggData[1:] - kr = generateKinesisRecord(mismatchAggData) - - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2 - // should return a single record. - assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.") -} - -// This function tests that the DeaggregateRecords function returns the correct number of -// deaggregated records from a single aggregated record. -func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - kr = generateKinesisRecord(aggData) - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // Variable Length Aggregate Record test has aggregaterd records and should return - // n length. - assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars)) - assert.Equal(t, n, len(dars), assertMsg) -} - -// This function tests the length of the message after magic file header. If length is less than -// the digest size (16 bytes), it is not an aggregated record. -func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - // Change size of proto message to 15 - reducedAggData := aggData[:19] - kr = generateKinesisRecord(reducedAggData) - - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with length less than 16 after the magic header should return - // a single record from DeaggregateRecords - assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.") -} - -// This function tests the MD5 Sum at the end of the record by comparing MD5 sum -// at end of proto record with MD5 Sum of Proto message. If they do not match, -// it is not an aggregated record. -func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - // Remove last byte from array to mismatch the MD5 sums - mismatchAggData := aggData[:len(aggData)-1] - kr = generateKinesisRecord(mismatchAggData) - - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with an MD5 sum that does not match with the md5.Sum(record) - // will be marked as a non-aggregate record and return a single record - assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.") -} diff --git a/internal/records/records.pb.go b/internal/records/records.pb.go deleted file mode 100644 index 89abba5..0000000 --- a/internal/records/records.pb.go +++ /dev/null @@ -1,215 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: records.proto - -package records - -import ( - fmt "fmt" - math "math" - - proto "github.com/golang/protobuf/proto" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type AggregatedRecord struct { - PartitionKeyTable []string `protobuf:"bytes,1,rep,name=partition_key_table,json=partitionKeyTable" json:"partition_key_table,omitempty"` - ExplicitHashKeyTable []string `protobuf:"bytes,2,rep,name=explicit_hash_key_table,json=explicitHashKeyTable" json:"explicit_hash_key_table,omitempty"` - Records []*Record `protobuf:"bytes,3,rep,name=records" json:"records,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *AggregatedRecord) Reset() { *m = AggregatedRecord{} } -func (m *AggregatedRecord) String() string { return proto.CompactTextString(m) } -func (*AggregatedRecord) ProtoMessage() {} -func (*AggregatedRecord) Descriptor() ([]byte, []int) { - return fileDescriptor_6ae0159314830e16, []int{0} -} - -func (m *AggregatedRecord) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_AggregatedRecord.Unmarshal(m, b) -} -func (m *AggregatedRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_AggregatedRecord.Marshal(b, m, deterministic) -} -func (m *AggregatedRecord) XXX_Merge(src proto.Message) { - xxx_messageInfo_AggregatedRecord.Merge(m, src) -} -func (m *AggregatedRecord) XXX_Size() int { - return xxx_messageInfo_AggregatedRecord.Size(m) -} -func (m *AggregatedRecord) XXX_DiscardUnknown() { - xxx_messageInfo_AggregatedRecord.DiscardUnknown(m) -} - -var xxx_messageInfo_AggregatedRecord proto.InternalMessageInfo - -func (m *AggregatedRecord) GetPartitionKeyTable() []string { - if m != nil { - return m.PartitionKeyTable - } - return nil -} - -func (m *AggregatedRecord) GetExplicitHashKeyTable() []string { - if m != nil { - return m.ExplicitHashKeyTable - } - return nil -} - -func (m *AggregatedRecord) GetRecords() []*Record { - if m != nil { - return m.Records - } - return nil -} - -type Tag struct { - Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"` - Value *string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Tag) Reset() { *m = Tag{} } -func (m *Tag) String() string { return proto.CompactTextString(m) } -func (*Tag) ProtoMessage() {} -func (*Tag) Descriptor() ([]byte, []int) { - return fileDescriptor_6ae0159314830e16, []int{1} -} - -func (m *Tag) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Tag.Unmarshal(m, b) -} -func (m *Tag) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Tag.Marshal(b, m, deterministic) -} -func (m *Tag) XXX_Merge(src proto.Message) { - xxx_messageInfo_Tag.Merge(m, src) -} -func (m *Tag) XXX_Size() int { - return xxx_messageInfo_Tag.Size(m) -} -func (m *Tag) XXX_DiscardUnknown() { - xxx_messageInfo_Tag.DiscardUnknown(m) -} - -var xxx_messageInfo_Tag proto.InternalMessageInfo - -func (m *Tag) GetKey() string { - if m != nil && m.Key != nil { - return *m.Key - } - return "" -} - -func (m *Tag) GetValue() string { - if m != nil && m.Value != nil { - return *m.Value - } - return "" -} - -type Record struct { - PartitionKeyIndex *uint64 `protobuf:"varint,1,req,name=partition_key_index,json=partitionKeyIndex" json:"partition_key_index,omitempty"` - ExplicitHashKeyIndex *uint64 `protobuf:"varint,2,opt,name=explicit_hash_key_index,json=explicitHashKeyIndex" json:"explicit_hash_key_index,omitempty"` - Data []byte `protobuf:"bytes,3,req,name=data" json:"data,omitempty"` - Tags []*Tag `protobuf:"bytes,4,rep,name=tags" json:"tags,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Record) Reset() { *m = Record{} } -func (m *Record) String() string { return proto.CompactTextString(m) } -func (*Record) ProtoMessage() {} -func (*Record) Descriptor() ([]byte, []int) { - return fileDescriptor_6ae0159314830e16, []int{2} -} - -func (m *Record) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Record.Unmarshal(m, b) -} -func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Record.Marshal(b, m, deterministic) -} -func (m *Record) XXX_Merge(src proto.Message) { - xxx_messageInfo_Record.Merge(m, src) -} -func (m *Record) XXX_Size() int { - return xxx_messageInfo_Record.Size(m) -} -func (m *Record) XXX_DiscardUnknown() { - xxx_messageInfo_Record.DiscardUnknown(m) -} - -var xxx_messageInfo_Record proto.InternalMessageInfo - -func (m *Record) GetPartitionKeyIndex() uint64 { - if m != nil && m.PartitionKeyIndex != nil { - return *m.PartitionKeyIndex - } - return 0 -} - -func (m *Record) GetExplicitHashKeyIndex() uint64 { - if m != nil && m.ExplicitHashKeyIndex != nil { - return *m.ExplicitHashKeyIndex - } - return 0 -} - -func (m *Record) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func (m *Record) GetTags() []*Tag { - if m != nil { - return m.Tags - } - return nil -} - -func init() { - proto.RegisterType((*AggregatedRecord)(nil), "AggregatedRecord") - proto.RegisterType((*Tag)(nil), "Tag") - proto.RegisterType((*Record)(nil), "Record") -} - -func init() { proto.RegisterFile("records.proto", fileDescriptor_6ae0159314830e16) } - -var fileDescriptor_6ae0159314830e16 = []byte{ - // 245 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x8f, 0xc1, 0x4a, 0xc4, 0x30, - 0x10, 0x86, 0xc9, 0x26, 0xba, 0x74, 0x54, 0x58, 0xe3, 0x82, 0x39, 0xd6, 0x9e, 0x72, 0xb1, 0x07, - 0xc1, 0x07, 0xf0, 0xa6, 0x78, 0x0b, 0xbd, 0x97, 0x71, 0x3b, 0xa4, 0x61, 0xcb, 0xb6, 0xa4, 0x51, - 0xb6, 0xef, 0xa2, 0xef, 0x2a, 0x49, 0xdd, 0x45, 0x51, 0x6f, 0x93, 0xf9, 0xf9, 0x32, 0xff, 0x07, - 0x17, 0x9e, 0x36, 0xbd, 0x6f, 0xc6, 0x72, 0xf0, 0x7d, 0xe8, 0x8b, 0x77, 0x06, 0xab, 0x07, 0x6b, - 0x3d, 0x59, 0x0c, 0xd4, 0x98, 0x94, 0xc9, 0x12, 0xae, 0x06, 0xf4, 0xc1, 0x05, 0xd7, 0xef, 0xea, - 0x2d, 0x4d, 0x75, 0xc0, 0x97, 0x8e, 0x14, 0xcb, 0xb9, 0xce, 0xcc, 0xe5, 0x31, 0x7a, 0xa6, 0xa9, - 0x8a, 0x81, 0xbc, 0x87, 0x6b, 0xda, 0x0f, 0x9d, 0xdb, 0xb8, 0x50, 0xb7, 0x38, 0xb6, 0xdf, 0x98, - 0x45, 0x62, 0xd6, 0x87, 0xf8, 0x11, 0xc7, 0xf6, 0x88, 0xdd, 0xc0, 0xf2, 0xab, 0x8c, 0xe2, 0x39, - 0xd7, 0x67, 0x77, 0xcb, 0x72, 0x2e, 0x60, 0x0e, 0xfb, 0xe2, 0x16, 0x78, 0x85, 0x56, 0xae, 0x80, - 0x6f, 0x69, 0x52, 0x2c, 0x5f, 0xe8, 0xcc, 0xc4, 0x51, 0xae, 0xe1, 0xe4, 0x0d, 0xbb, 0xd7, 0x78, - 0x80, 0xe9, 0xcc, 0xcc, 0x8f, 0xe2, 0x83, 0xc1, 0xe9, 0x7f, 0x0e, 0x6e, 0xd7, 0xd0, 0x3e, 0x7d, - 0x21, 0x7e, 0x3a, 0x3c, 0xc5, 0xe0, 0x6f, 0x87, 0x99, 0x89, 0x27, 0xc4, 0x2f, 0x87, 0x19, 0x93, - 0x20, 0x1a, 0x0c, 0xa8, 0x78, 0xbe, 0xd0, 0xe7, 0x26, 0xcd, 0x52, 0x81, 0x08, 0x68, 0x47, 0x25, - 0x92, 0x94, 0x28, 0x2b, 0xb4, 0x26, 0x6d, 0x3e, 0x03, 0x00, 0x00, 0xff, 0xff, 0x87, 0x3e, 0x63, - 0x69, 0x7d, 0x01, 0x00, 0x00, -} diff --git a/test/record_publisher_test.go b/test/record_publisher_test.go index e6c0419..cfd142b 100644 --- a/test/record_publisher_test.go +++ b/test/record_publisher_test.go @@ -29,24 +29,23 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/retry" awsConfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + rec "github.com/awslabs/kinesis-aggregation/go/v2/records" "github.com/golang/protobuf/proto" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/utils" - rec "github.com/vmware/vmware-go-kcl-v2/internal/records" ) const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}` // NewKinesisClient to create a Kinesis Client. -func NewKinesisClient(t *testing.T, regionName, endpoint string, creds *credentials.StaticCredentialsProvider) *kinesis.Client { +func NewKinesisClient(t *testing.T, regionName, endpoint string, creds aws.CredentialsProvider) *kinesis.Client { // create session for Kinesis t.Logf("Creating Kinesis client") - resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { + resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ PartitionID: "aws", URL: endpoint, @@ -57,12 +56,8 @@ func NewKinesisClient(t *testing.T, regionName, endpoint string, creds *credenti cfg, err := awsConfig.LoadDefaultConfig( context.TODO(), awsConfig.WithRegion(regionName), - awsConfig.WithCredentialsProvider( - credentials.NewStaticCredentialsProvider( - creds.Value.AccessKeyID, - creds.Value.SecretAccessKey, - creds.Value.SessionToken)), - awsConfig.WithEndpointResolver(resolver), + awsConfig.WithCredentialsProvider(creds), + awsConfig.WithEndpointResolverWithOptions(resolver), awsConfig.WithRetryer(func() aws.Retryer { return retry.AddWithMaxBackoffDelay(retry.NewStandard(), retry.DefaultMaxBackoff) }), @@ -77,8 +72,8 @@ func NewKinesisClient(t *testing.T, regionName, endpoint string, creds *credenti } // NewDynamoDBClient to create a Kinesis Client. -func NewDynamoDBClient(t *testing.T, regionName, endpoint string, creds *credentials.StaticCredentialsProvider) *dynamodb.Client { - resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { +func NewDynamoDBClient(t *testing.T, regionName, endpoint string, creds aws.CredentialsProvider) *dynamodb.Client { + resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ PartitionID: "aws", URL: endpoint, @@ -89,12 +84,8 @@ func NewDynamoDBClient(t *testing.T, regionName, endpoint string, creds *credent cfg, err := awsConfig.LoadDefaultConfig( context.TODO(), awsConfig.WithRegion(regionName), - awsConfig.WithCredentialsProvider( - credentials.NewStaticCredentialsProvider( - creds.Value.AccessKeyID, - creds.Value.SecretAccessKey, - creds.Value.SessionToken)), - awsConfig.WithEndpointResolver(resolver), + awsConfig.WithCredentialsProvider(creds), + awsConfig.WithEndpointResolverWithOptions(resolver), awsConfig.WithRetryer(func() aws.Retryer { return retry.AddWithMaxBackoffDelay(retry.NewStandard(), retry.DefaultMaxBackoff) }),