Feature/fabiano commits (#2)
* use cammel case Signed-off-by: Fabiano Graças <fabiano.gracas@faro.com> * fix credential usage Signed-off-by: Fabiano Graças <fabiano.gracas@faro.com> * apply lint against MD file Signed-off-by: Fabiano Graças <fabiano.gracas@faro.com> * upgrade versions Signed-off-by: Fabiano Graças <fabiano.gracas@faro.com> * remove calls to deprecated functions. Signed-off-by: Fabiano Graças <fabiano.gracas@faro.com> * fix cammel case usage Signed-off-by: Fabiano Graças <fabiano.gracas@faro.com> * since deaggregation package was upgraded to sdk v2 makes sense to use it. https://github.com/awslabs/kinesis-aggregation/pull/143#issuecomment-953308464 Signed-off-by: Fabiano Graças <fabiano.gracas@faro.com> * fix format Signed-off-by: Fabiano Graças <fabiano.gracas@faro.com> Co-authored-by: Fabiano Graças <fabiano.gracas@faro.com>
This commit is contained in:
parent
c02b7a85d4
commit
c19ae1d605
15 changed files with 105 additions and 628 deletions
4
Makefile
4
Makefile
|
|
@ -22,7 +22,7 @@ build: build-common ## - build a debug binary to the current platform (windows,
|
|||
|
||||
.PHONY: format-check
|
||||
format-check: ## - check files format using gofmt
|
||||
@ ./_support/scripts/ci.sh fmtcheck
|
||||
@ ./_support/scripts/ci.sh fmtCheck
|
||||
|
||||
.PHONY: format-check
|
||||
format: ## - apply golang file format using gofmt
|
||||
|
|
@ -30,7 +30,7 @@ format: ## - apply golang file format using gofmt
|
|||
|
||||
.PHONY: test
|
||||
test: build-common ## - execute go test command for unit and mocked tests
|
||||
@ ./_support/scripts/ci.sh unittest
|
||||
@ ./_support/scripts/ci.sh unitTest
|
||||
|
||||
.PHONY: integration-test
|
||||
integration-test: ## - execute go test command for integration tests (aws credentials needed)
|
||||
|
|
|
|||
33
README.md
33
README.md
|
|
@ -1,22 +1,22 @@
|
|||
# VMWare Go KCL v2
|
||||
|
||||

|
||||
[](https://goreportcard.com/report/github.com/vmware/vmware-go-kcl)
|
||||
[](https://goreportcard.com/report/github.com/vmware/vmware-go-kcl-v2)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
[](https://github.com/fafg/vmware-go-kcl/actions/workflows/vmware-go-kcl-v2-ci.yml)
|
||||
[](https://github.com/fafg/vmware-go-kcl-v2/actions/workflows/vmware-go-kcl-v2-ci.yml)
|
||||
|
||||
## Overview
|
||||
|
||||
VMware-Go-KCL is a native open-source Go library for Amazon Kinesis Data Stream (KDS) consumption. It allows developers
|
||||
to program KDS consumers in lightweight Go language and still take advantage of the features presented by the native
|
||||
VMware-Go-KCL-V2 is a native open-source Go library for Amazon Kinesis Data Stream (KDS) consumption. It allows developers
|
||||
to program KDS consumers in lightweight Go language and still take advantage of the features presented by the native
|
||||
KDS Java API libraries.
|
||||
|
||||
[vmware-go-kcl](https://github.com/vmware/vmware-go-kcl) is a VMWare originated open-source project for AWS Kinesis
|
||||
Client Library in Go. Within VMware, we have seen adoption in vSecureState and Carbon Black. In addition, Carbon Black
|
||||
has contributed to the vmware-go-kcl codebase and heavily used it in the product. Besides,
|
||||
[vmware-go-kcl](https://github.com/vmware/vmware-go-kcl) has got
|
||||
[recognition](https://www.linkedin.com/posts/adityakrish_vmware-go-kcl-a-native-open-source-go-programming-activity-6810626798133616640-B6W8/),
|
||||
and [contributions](https://github.com/vmware/vmware-go-kcl/graphs/contributors) from the industry.
|
||||
[vmware-go-kcl-v2](https://github.com/vmware/vmware-go-kcl-v2) is a VMWare originated open-source project for AWS Kinesis
|
||||
Client Library in Go. Within VMware, we have seen adoption in vSecureState and Carbon Black. In addition, Carbon Black
|
||||
has contributed to the vmware-go-kcl codebase and heavily used it in the product. Besides,
|
||||
[vmware-go-kcl-v2](https://github.com/vmware/vmware-go-kcl-v2) has got
|
||||
[recognition](https://www.linkedin.com/posts/adityakrish_vmware-go-kcl-a-native-open-source-go-programming-activity-6810626798133616640-B6W8/),
|
||||
and [contributions](https://github.com/vmware/vmware-go-kcl-v2/graphs/contributors) from the industry.
|
||||
|
||||
`vmware-go-kcl-v2` is the v2 version of VMWare KCL for the Go programming language by utilizing [AWS Go SDK V2](https://github.com/aws/aws-sdk-go-v2).
|
||||
|
||||
|
|
@ -31,21 +31,20 @@ and [contributions](https://github.com/vmware/vmware-go-kcl/graphs/contributors)
|
|||
### Build & Run
|
||||
|
||||
1. Initialize Project
|
||||
2. Build
|
||||
|
||||
`make build`
|
||||
2. Build
|
||||
> `make build`
|
||||
|
||||
3. Test
|
||||
|
||||
`make test`
|
||||
> `make test`
|
||||
|
||||
## Documentation
|
||||
|
||||
VMware-Go-KCL matches exactly the same interface and programming model from original Amazon KCL, the best place for getting reference, tutorial is from Amazon itself:
|
||||
|
||||
- [Developing Consumers Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html)
|
||||
- [Troubleshooting](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html)
|
||||
- [Advanced Topics](https://docs.aws.amazon.com/streams/latest/dev/advanced-consumers.html)
|
||||
* [Developing Consumers Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html)
|
||||
* [Troubleshooting](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html)
|
||||
* [Advanced Topics](https://docs.aws.amazon.com/streams/latest/dev/advanced-consumers.html)
|
||||
|
||||
## Contributing
|
||||
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ function scanast() {
|
|||
rm -f security.log
|
||||
}
|
||||
|
||||
function Scan() {
|
||||
function scan() {
|
||||
gosec -fmt=sarif -out=results.sarif -exclude-dir=internal -exclude-dir=vendor -severity=high ./...
|
||||
}
|
||||
|
||||
|
|
@ -117,11 +117,11 @@ function usage() {
|
|||
}
|
||||
|
||||
case "$1" in
|
||||
fmtcheck) checkfmt ;;
|
||||
fmtCheck) checkfmt ;;
|
||||
format) goFormat ;;
|
||||
lint) lint ;;
|
||||
lintDocker) lintDocker ;;
|
||||
unittest) unitTest ;;
|
||||
unitTest) unitTest ;;
|
||||
scan) scan ;;
|
||||
localScan) localScan ;;
|
||||
*) usage ;;
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
|
|||
checkpointer.log.Infof("Creating DynamoDB session")
|
||||
|
||||
if checkpointer.svc == nil {
|
||||
er := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
|
||||
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||
if service == dynamodb.ServiceID && len(checkpointer.kclConfig.DynamoDBEndpoint) > 0 {
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
|
|
@ -106,7 +106,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
|
|||
context.TODO(),
|
||||
awsConfig.WithRegion(checkpointer.kclConfig.RegionName),
|
||||
awsConfig.WithCredentialsProvider(checkpointer.kclConfig.DynamoDBCredentials),
|
||||
awsConfig.WithEndpointResolver(er),
|
||||
awsConfig.WithEndpointResolverWithOptions(resolver),
|
||||
awsConfig.WithRetryer(func() aws.Retryer {
|
||||
return retry.AddWithMaxBackoffDelay(retry.NewStandard(), retry.DefaultMaxBackoff)
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
|
||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
|
||||
"github.com/vmware/vmware-go-kcl-v2/logger"
|
||||
|
|
@ -174,10 +173,10 @@ type (
|
|||
KinesisEndpoint string
|
||||
|
||||
// KinesisCredentials is used to access Kinesis
|
||||
KinesisCredentials *credentials.StaticCredentialsProvider
|
||||
KinesisCredentials aws.CredentialsProvider
|
||||
|
||||
// DynamoDBCredentials is used to access DynamoDB
|
||||
DynamoDBCredentials *credentials.StaticCredentialsProvider
|
||||
DynamoDBCredentials aws.CredentialsProvider
|
||||
|
||||
// TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName
|
||||
TableName string
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ import (
|
|||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
|
||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
|
||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/utils"
|
||||
|
|
@ -55,13 +55,13 @@ func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID
|
|||
|
||||
// NewKinesisClientLibConfigWithCredential creates a default KinesisClientLibConfiguration based on the required fields and unique credentials.
|
||||
func NewKinesisClientLibConfigWithCredential(applicationName, streamName, regionName, workerID string,
|
||||
creds *credentials.StaticCredentialsProvider) *KinesisClientLibConfiguration {
|
||||
creds aws.CredentialsProvider) *KinesisClientLibConfiguration {
|
||||
return NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID, creds, creds)
|
||||
}
|
||||
|
||||
// NewKinesisClientLibConfigWithCredentials creates a default KinesisClientLibConfiguration based on the required fields and specific credentials for each service.
|
||||
func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID string,
|
||||
kinesisCreds, dynamodbCreds *credentials.StaticCredentialsProvider) *KinesisClientLibConfiguration {
|
||||
kinesisCreds, dynamodbCreds aws.CredentialsProvider) *KinesisClientLibConfiguration {
|
||||
checkIsValueNotEmpty("ApplicationName", applicationName)
|
||||
checkIsValueNotEmpty("StreamName", streamName)
|
||||
checkIsValueNotEmpty("RegionName", regionName)
|
||||
|
|
|
|||
|
|
@ -35,8 +35,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
|
||||
cwatch "github.com/aws/aws-sdk-go-v2/service/cloudwatch"
|
||||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
|
||||
|
||||
|
|
@ -51,7 +49,7 @@ type MonitoringService struct {
|
|||
streamName string
|
||||
workerID string
|
||||
region string
|
||||
credentials *credentials.StaticCredentialsProvider
|
||||
credentials aws.CredentialsProvider
|
||||
logger logger.Logger
|
||||
|
||||
// control how often to publish to CloudWatch
|
||||
|
|
@ -76,13 +74,13 @@ type cloudWatchMetrics struct {
|
|||
}
|
||||
|
||||
// NewMonitoringService returns a Monitoring service publishing metrics to CloudWatch.
|
||||
func NewMonitoringService(region string, creds *credentials.StaticCredentialsProvider) *MonitoringService {
|
||||
func NewMonitoringService(region string, creds aws.CredentialsProvider) *MonitoringService {
|
||||
return NewMonitoringServiceWithOptions(region, creds, logger.GetDefaultLogger(), DefaultCloudwatchMetricsBufferDuration)
|
||||
}
|
||||
|
||||
// NewMonitoringServiceWithOptions returns a Monitoring service publishing metrics to
|
||||
// CloudWatch with the provided credentials, buffering duration and logger.
|
||||
func NewMonitoringServiceWithOptions(region string, creds *credentials.StaticCredentialsProvider, logger logger.Logger, bufferDur time.Duration) *MonitoringService {
|
||||
func NewMonitoringServiceWithOptions(region string, creds aws.CredentialsProvider, logger logger.Logger, bufferDur time.Duration) *MonitoringService {
|
||||
return &MonitoringService{
|
||||
region: region,
|
||||
credentials: creds,
|
||||
|
|
|
|||
|
|
@ -27,13 +27,13 @@ import (
|
|||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||
deagg "github.com/awslabs/kinesis-aggregation/go/v2/deaggregator"
|
||||
|
||||
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
|
||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
|
||||
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
|
||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
|
||||
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"
|
||||
deagg "github.com/vmware/vmware-go-kcl-v2/internal/deaggregator"
|
||||
)
|
||||
|
||||
type shardConsumer interface {
|
||||
|
|
|
|||
|
|
@ -40,7 +40,6 @@ import (
|
|||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/aws/retry"
|
||||
awsConfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||
|
||||
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
|
||||
|
|
@ -160,7 +159,7 @@ func (w *Worker) initialize() error {
|
|||
// create session for Kinesis
|
||||
log.Infof("Creating Kinesis client")
|
||||
|
||||
resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
|
||||
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
URL: w.kclConfig.KinesisEndpoint,
|
||||
|
|
@ -171,12 +170,8 @@ func (w *Worker) initialize() error {
|
|||
cfg, err := awsConfig.LoadDefaultConfig(
|
||||
context.TODO(),
|
||||
awsConfig.WithRegion(w.regionName),
|
||||
awsConfig.WithCredentialsProvider(
|
||||
credentials.NewStaticCredentialsProvider(
|
||||
w.kclConfig.KinesisCredentials.Value.AccessKeyID,
|
||||
w.kclConfig.KinesisCredentials.Value.SecretAccessKey,
|
||||
w.kclConfig.KinesisCredentials.Value.SessionToken)),
|
||||
awsConfig.WithEndpointResolver(resolver),
|
||||
awsConfig.WithCredentialsProvider(w.kclConfig.KinesisCredentials),
|
||||
awsConfig.WithEndpointResolverWithOptions(resolver),
|
||||
awsConfig.WithRetryer(func() aws.Retryer {
|
||||
return retry.AddWithMaxBackoffDelay(retry.NewStandard(), retry.DefaultMaxBackoff)
|
||||
}),
|
||||
|
|
|
|||
35
go.mod
35
go.mod
|
|
@ -3,35 +3,36 @@ module github.com/vmware/vmware-go-kcl-v2
|
|||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/aws/aws-sdk-go-v2 v1.11.0
|
||||
github.com/aws/aws-sdk-go-v2/config v1.10.0
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.6.0
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.10.0
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.7.0
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.8.0
|
||||
github.com/aws/aws-sdk-go-v2 v1.11.2
|
||||
github.com/aws/aws-sdk-go-v2/config v1.11.1
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.6.5
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0
|
||||
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
github.com/prometheus/common v0.32.1
|
||||
github.com/rs/zerolog v1.26.0
|
||||
github.com/rs/zerolog v1.26.1
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.uber.org/zap v1.19.1
|
||||
go.uber.org/zap v1.20.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/BurntSushi/toml v0.4.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.6.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.9.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 // indirect
|
||||
github.com/aws/smithy-go v1.9.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
|
|
@ -43,7 +44,7 @@ require (
|
|||
github.com/prometheus/procfs v0.7.3 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.7.0 // indirect
|
||||
golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e // indirect
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||
)
|
||||
|
|
|
|||
79
go.sum
79
go.sum
|
|
@ -40,40 +40,45 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
|
|||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
||||
github.com/aws/aws-sdk-go-v2 v1.11.0 h1:HxyD62DyNhCfiFGUHqJ/xITD6rAjJ7Dm/2nLxLmO4Ag=
|
||||
github.com/aws/aws-sdk-go-v2 v1.11.0/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
|
||||
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
|
||||
github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs=
|
||||
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.10.0 h1:4i+/7DmCQCAls5Z61giur0LOPZ3PXFwnSIw7hRamzws=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.10.0/go.mod h1:xuqoV5etD3N3B8Ts9je4ijgAv6mb+6NiOPFMUhwRcjA=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.6.0 h1:L3O6osQTlzLKRmiTphw2QJuD21EFapWCX4IipiRJhAE=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.6.0/go.mod h1:rQkYdQPDXRrvPLeEuCNwSgtwMzBo9eDGWlTNC69Sh/0=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 h1:OpZjuUy8Jt3CA1WgJgBC5Bz+uOjE5Ppx4NFTRaooUuA=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0/go.mod h1:5E1J3/TTYy6z909QNR0QnXGBpfESYGDqd3O0zqONghU=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0 h1:zY8cNmbBXt3pzjgWgdIbzpQ6qxoCwt+Nx9JbrAf2mbY=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0/go.mod h1:NO3Q5ZTTQtO2xIg2+xTXYDiT7knSejfeDm7WGDaOo0U=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0 h1:Z3aR/OXBnkYK9zXkNkfitHX6SmUBzSsx8VMHbH4Lvhw=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0/go.mod h1:anlUzBoEWglcUxUQwZA7HQOEVEnQALVZsizAapB2hq8=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.0 h1:c10Z7fWxtJCoyc8rv06jdh9xrKnu7bAJiRaKWvTb2mU=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.0/go.mod h1:6oXGy4GLpypD3uCh8wcqztigGgmhLToMfjavgh+VySg=
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.10.0 h1:MNNV0fi3J5Lxxhx8iDlKdRZJrtBv/0FyganA3nBYe8Q=
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.10.0/go.mod h1:Oiwhs3Fo9amYOGsJggWBPU6bwa/u0xVpEdOS5HlouPg=
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.7.0 h1:S3X6RWl0TfMxNXsIzz8r3Y6YVA1HWGSx6M345Q3mQ+I=
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.7.0/go.mod h1:Hh0zJ3419ET9xQBeR+y0lHIkObJwAKPbzV9nTZ0yrJ0=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.11.1 h1:KXSjb7ZMLRtjxClFptukTYibiOqJS9NwBO+9WD3UMto=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.11.1/go.mod h1:VvfkzUhVtntSg1JfGFMSKS0CyiTZd3NqBxK5af4zsME=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.6.5 h1:ZrsO2js2v4T95rsCIWoAb/ck5+U1kwkizGdZHY+ni3s=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.6.5/go.mod h1:HWSOnsnqVMbLcWUmom6AN1cqhcLzLJ62AObW28CbYbU=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 h1:KiN5TPOLrEjbGCvdTQR4t0U4T87vVwALZ5Bg3jpMqPY=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2/go.mod h1:dF2F6tXEOgmW5X1ZFO/EPtWrcm7XkW07KNcJUGNtt4s=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 h1:IQup8Q6lorXeiA/rK72PeToWoWK8h7VAPgHNWdSrtgE=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2/go.mod h1:VITe/MdW6EMXPb0o0txu/fsonXbMHUU2OC2Qp7ivU4o=
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 h1:BcSBoss+CeyRS4TgZKAcR6kcZ0Sb2P+DHs8r8aMlTpQ=
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0/go.mod h1:eAgmZ4hIzTsTOlAA7yvGJz+RywxZo3KWtGt7J+jAUxU=
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 h1:te+nIFwPf5Bi/cZvd9g/+EF0gkJT3c0J/5+NMx0NBZg=
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0/go.mod h1:ELltfl9ri0n4sZ/VjPZBgemNMd9mYIpCAuZhc7NP7l4=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.0 h1:A2aUh9d38A2ECh76ahOQUdpJFe+Jhjk8qrfV+YbNYGY=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.0/go.mod h1:5h2rxfLN22pLTQ1ZoOza87rp2SnN/9UDYdYBQRmIrsE=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 h1:qGZWS/WgiFY+Zgad2u0gwBHpJxz6Ne401JE7iQI1nKs=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0/go.mod h1:Mq6AEc+oEjCUlBuLiK5YwW4shSOAKCQ3tXN0sQeYoBA=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.8.0 h1:Cz26j4wGD1tJ2w/M8iLhaS81AkAGY3gEYRt0xQWjEIs=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.8.0/go.mod h1:QyNCg1xtWFJVL++i6ZyVcwXZCiKTNeXHH9zZu3NHOdU=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.6.0 h1:JDgKIUZOmLFu/Rv6zXLrVTWCmzA0jcTdvsT8iFIKrAI=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.6.0/go.mod h1:Q/l0ON1annSU+mc0JybDy1Gy6dnJxIcWjphO6qJPzvM=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.9.0 h1:rBLCnL8hQ7Sv1S4XCPYgTMI7Uhg81BkvzIiK+/of2zY=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.9.0/go.mod h1:jLKCFqS+1T4i7HDqCP9GM4Uk75YW1cS0o82LdxpMyOE=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA=
|
||||
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
|
||||
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58=
|
||||
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
|
||||
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
|
||||
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
|
||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
|
|
@ -226,8 +231,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0
|
|||
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/zerolog v1.26.0 h1:ORM4ibhEZeTeQlCojCK2kPz1ogAY4bGs4tD+SaAdGaE=
|
||||
github.com/rs/zerolog v1.26.0/go.mod h1:yBiM87lvSqX8h0Ww4sdzNSkVYZ8dL2xjZJG1lAuGZEo=
|
||||
github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc=
|
||||
github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
|
||||
|
|
@ -253,19 +258,20 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
|||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
|
||||
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
|
||||
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
|
||||
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
|
||||
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
|
||||
go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
|
||||
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
|
||||
go.uber.org/zap v1.20.0 h1:N4oPlghZwYG55MlU6LXk/Zp00FVNE9X9wrYO8CEs4lc=
|
||||
go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
|
|
@ -325,6 +331,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/
|
|||
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
|
|
@ -383,8 +390,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e h1:zeJt6jBtVDK23XK9QXcmG0FvO0elikp0dYZQZOeL1y0=
|
||||
golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
|
|
|
|||
|
|
@ -1,96 +0,0 @@
|
|||
// Package deaggregator
|
||||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
package deaggregator
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
rec "github.com/vmware/vmware-go-kcl-v2/internal/records"
|
||||
)
|
||||
|
||||
// KplMagicHeader Magic File Header for a KPL Aggregated Record
|
||||
var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2"))
|
||||
|
||||
const (
|
||||
KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking.
|
||||
DigestSize = 16 // MD5 Message size for protobuf.
|
||||
)
|
||||
|
||||
// DeaggregateRecords takes an array of Kinesis records and expands any Protobuf
|
||||
// records within that array, returning an array of all records
|
||||
func DeaggregateRecords(records []types.Record) ([]types.Record, error) {
|
||||
var isAggregated bool
|
||||
allRecords := make([]types.Record, 0)
|
||||
|
||||
for _, record := range records {
|
||||
isAggregated = true
|
||||
|
||||
var dataMagic string
|
||||
var decodedDataNoMagic []byte
|
||||
// Check if record is long enough to have magic file header
|
||||
if len(record.Data) >= KplMagicLen {
|
||||
dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen])
|
||||
decodedDataNoMagic = record.Data[KplMagicLen:]
|
||||
} else {
|
||||
isAggregated = false
|
||||
}
|
||||
|
||||
// Check if record has KPL Aggregate Record Magic Header and data length
|
||||
// is correct size
|
||||
if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize {
|
||||
isAggregated = false
|
||||
}
|
||||
|
||||
if isAggregated {
|
||||
messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:])
|
||||
messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize]
|
||||
|
||||
calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData))
|
||||
|
||||
// Check protobuf MD5 hash matches MD5 sum of record
|
||||
if messageDigest != calculatedDigest {
|
||||
isAggregated = false
|
||||
} else {
|
||||
aggRecord := &rec.AggregatedRecord{}
|
||||
err := proto.Unmarshal(messageData, aggRecord)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
partitionKeys := aggRecord.PartitionKeyTable
|
||||
|
||||
for _, aggrec := range aggRecord.Records {
|
||||
newRecord := createUserRecord(partitionKeys, aggrec, record)
|
||||
allRecords = append(allRecords, newRecord)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !isAggregated {
|
||||
allRecords = append(allRecords, record)
|
||||
}
|
||||
}
|
||||
|
||||
return allRecords, nil
|
||||
}
|
||||
|
||||
// createUserRecord takes in the partitionKeys of the aggregated record, the individual
|
||||
// deaggregated record, and the original aggregated record builds a kinesis.Record and
|
||||
// returns it
|
||||
func createUserRecord(partitionKeys []string, aggRec *rec.Record, record types.Record) types.Record {
|
||||
partitionKey := partitionKeys[*aggRec.PartitionKeyIndex]
|
||||
|
||||
return types.Record{
|
||||
ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp,
|
||||
Data: aggRec.Data,
|
||||
EncryptionType: record.EncryptionType,
|
||||
PartitionKey: &partitionKey,
|
||||
SequenceNumber: record.SequenceNumber,
|
||||
}
|
||||
}
|
||||
|
|
@ -1,202 +0,0 @@
|
|||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
package deaggregator_test
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
deagg "github.com/vmware/vmware-go-kcl-v2/internal/deaggregator"
|
||||
rec "github.com/vmware/vmware-go-kcl-v2/internal/records"
|
||||
)
|
||||
|
||||
// Generate an aggregate record in the correct AWS-specified format
|
||||
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
|
||||
func generateAggregateRecord(numRecords int) []byte {
|
||||
|
||||
aggr := &rec.AggregatedRecord{}
|
||||
// Start with the magic header
|
||||
aggRecord := []byte("\xf3\x89\x9a\xc2")
|
||||
partKeyTable := make([]string, 0)
|
||||
|
||||
// Create proto record with numRecords length
|
||||
for i := 0; i < numRecords; i++ {
|
||||
var partKey uint64
|
||||
var hashKey uint64
|
||||
partKey = uint64(i)
|
||||
hashKey = uint64(i) * uint64(10)
|
||||
r := &rec.Record{
|
||||
PartitionKeyIndex: &partKey,
|
||||
ExplicitHashKeyIndex: &hashKey,
|
||||
Data: []byte("Some test data string"),
|
||||
Tags: make([]*rec.Tag, 0),
|
||||
}
|
||||
|
||||
aggr.Records = append(aggr.Records, r)
|
||||
partKeyVal := "test" + fmt.Sprint(i)
|
||||
partKeyTable = append(partKeyTable, partKeyVal)
|
||||
}
|
||||
|
||||
aggr.PartitionKeyTable = partKeyTable
|
||||
// Marshal to protobuf record, create md5 sum from proto record
|
||||
// and append both to aggRecord with magic header
|
||||
data, _ := proto.Marshal(aggr)
|
||||
md5Hash := md5.Sum(data)
|
||||
aggRecord = append(aggRecord, data...)
|
||||
aggRecord = append(aggRecord, md5Hash[:]...)
|
||||
return aggRecord
|
||||
}
|
||||
|
||||
// Generate a generic kinesis.Record using whatever []byte
|
||||
// is passed in as the data (can be normal []byte or proto record)
|
||||
func generateKinesisRecord(data []byte) types.Record {
|
||||
currentTime := time.Now()
|
||||
encryptionType := types.EncryptionTypeNone
|
||||
partitionKey := "1234"
|
||||
sequenceNumber := "21269319989900637946712965403778482371"
|
||||
return types.Record{
|
||||
ApproximateArrivalTimestamp: ¤tTime,
|
||||
Data: data,
|
||||
EncryptionType: encryptionType,
|
||||
PartitionKey: &partitionKey,
|
||||
SequenceNumber: &sequenceNumber,
|
||||
}
|
||||
}
|
||||
|
||||
// This tests to make sure that the data is at least larger than the length
|
||||
// of the magic header to do some array slicing with index out of bounds
|
||||
func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
|
||||
var err error
|
||||
var kr types.Record
|
||||
|
||||
krs := make([]types.Record, 0, 1)
|
||||
|
||||
smallByte := []byte("No")
|
||||
kr = generateKinesisRecord(smallByte)
|
||||
krs = append(krs, kr)
|
||||
dars, err := deagg.DeaggregateRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Small byte test, since this is not a deaggregated record, should return 1
|
||||
// record in the array.
|
||||
assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.")
|
||||
}
|
||||
|
||||
// This function tests to make sure that the data starts with the correct magic header
|
||||
// according to KPL aggregate documentation.
|
||||
func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) {
|
||||
var err error
|
||||
var kr types.Record
|
||||
|
||||
krs := make([]types.Record, 0, 1)
|
||||
|
||||
min := 1
|
||||
max := 10
|
||||
n := rand.Intn(max-min) + min
|
||||
aggData := generateAggregateRecord(n)
|
||||
mismatchAggData := aggData[1:]
|
||||
kr = generateKinesisRecord(mismatchAggData)
|
||||
|
||||
krs = append(krs, kr)
|
||||
|
||||
dars, err := deagg.DeaggregateRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2
|
||||
// should return a single record.
|
||||
assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.")
|
||||
}
|
||||
|
||||
// This function tests that the DeaggregateRecords function returns the correct number of
|
||||
// deaggregated records from a single aggregated record.
|
||||
func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
|
||||
var err error
|
||||
var kr types.Record
|
||||
|
||||
krs := make([]types.Record, 0, 1)
|
||||
|
||||
min := 1
|
||||
max := 10
|
||||
n := rand.Intn(max-min) + min
|
||||
aggData := generateAggregateRecord(n)
|
||||
kr = generateKinesisRecord(aggData)
|
||||
krs = append(krs, kr)
|
||||
|
||||
dars, err := deagg.DeaggregateRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Variable Length Aggregate Record test has aggregaterd records and should return
|
||||
// n length.
|
||||
assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars))
|
||||
assert.Equal(t, n, len(dars), assertMsg)
|
||||
}
|
||||
|
||||
// This function tests the length of the message after magic file header. If length is less than
|
||||
// the digest size (16 bytes), it is not an aggregated record.
|
||||
func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) {
|
||||
var err error
|
||||
var kr types.Record
|
||||
|
||||
krs := make([]types.Record, 0, 1)
|
||||
|
||||
min := 1
|
||||
max := 10
|
||||
n := rand.Intn(max-min) + min
|
||||
aggData := generateAggregateRecord(n)
|
||||
// Change size of proto message to 15
|
||||
reducedAggData := aggData[:19]
|
||||
kr = generateKinesisRecord(reducedAggData)
|
||||
|
||||
krs = append(krs, kr)
|
||||
|
||||
dars, err := deagg.DeaggregateRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// A byte record with length less than 16 after the magic header should return
|
||||
// a single record from DeaggregateRecords
|
||||
assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.")
|
||||
}
|
||||
|
||||
// This function tests the MD5 Sum at the end of the record by comparing MD5 sum
|
||||
// at end of proto record with MD5 Sum of Proto message. If they do not match,
|
||||
// it is not an aggregated record.
|
||||
func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) {
|
||||
var err error
|
||||
var kr types.Record
|
||||
|
||||
krs := make([]types.Record, 0, 1)
|
||||
|
||||
min := 1
|
||||
max := 10
|
||||
n := rand.Intn(max-min) + min
|
||||
aggData := generateAggregateRecord(n)
|
||||
// Remove last byte from array to mismatch the MD5 sums
|
||||
mismatchAggData := aggData[:len(aggData)-1]
|
||||
kr = generateKinesisRecord(mismatchAggData)
|
||||
|
||||
krs = append(krs, kr)
|
||||
|
||||
dars, err := deagg.DeaggregateRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// A byte record with an MD5 sum that does not match with the md5.Sum(record)
|
||||
// will be marked as a non-aggregate record and return a single record
|
||||
assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.")
|
||||
}
|
||||
|
|
@ -1,215 +0,0 @@
|
|||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// source: records.proto
|
||||
|
||||
package records
|
||||
|
||||
import (
|
||||
fmt "fmt"
|
||||
math "math"
|
||||
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
type AggregatedRecord struct {
|
||||
PartitionKeyTable []string `protobuf:"bytes,1,rep,name=partition_key_table,json=partitionKeyTable" json:"partition_key_table,omitempty"`
|
||||
ExplicitHashKeyTable []string `protobuf:"bytes,2,rep,name=explicit_hash_key_table,json=explicitHashKeyTable" json:"explicit_hash_key_table,omitempty"`
|
||||
Records []*Record `protobuf:"bytes,3,rep,name=records" json:"records,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *AggregatedRecord) Reset() { *m = AggregatedRecord{} }
|
||||
func (m *AggregatedRecord) String() string { return proto.CompactTextString(m) }
|
||||
func (*AggregatedRecord) ProtoMessage() {}
|
||||
func (*AggregatedRecord) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_6ae0159314830e16, []int{0}
|
||||
}
|
||||
|
||||
func (m *AggregatedRecord) XXX_Unmarshal(b []byte) error {
|
||||
return xxx_messageInfo_AggregatedRecord.Unmarshal(m, b)
|
||||
}
|
||||
func (m *AggregatedRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
return xxx_messageInfo_AggregatedRecord.Marshal(b, m, deterministic)
|
||||
}
|
||||
func (m *AggregatedRecord) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_AggregatedRecord.Merge(m, src)
|
||||
}
|
||||
func (m *AggregatedRecord) XXX_Size() int {
|
||||
return xxx_messageInfo_AggregatedRecord.Size(m)
|
||||
}
|
||||
func (m *AggregatedRecord) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_AggregatedRecord.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_AggregatedRecord proto.InternalMessageInfo
|
||||
|
||||
func (m *AggregatedRecord) GetPartitionKeyTable() []string {
|
||||
if m != nil {
|
||||
return m.PartitionKeyTable
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *AggregatedRecord) GetExplicitHashKeyTable() []string {
|
||||
if m != nil {
|
||||
return m.ExplicitHashKeyTable
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *AggregatedRecord) GetRecords() []*Record {
|
||||
if m != nil {
|
||||
return m.Records
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Tag struct {
|
||||
Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
|
||||
Value *string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Tag) Reset() { *m = Tag{} }
|
||||
func (m *Tag) String() string { return proto.CompactTextString(m) }
|
||||
func (*Tag) ProtoMessage() {}
|
||||
func (*Tag) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_6ae0159314830e16, []int{1}
|
||||
}
|
||||
|
||||
func (m *Tag) XXX_Unmarshal(b []byte) error {
|
||||
return xxx_messageInfo_Tag.Unmarshal(m, b)
|
||||
}
|
||||
func (m *Tag) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
return xxx_messageInfo_Tag.Marshal(b, m, deterministic)
|
||||
}
|
||||
func (m *Tag) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_Tag.Merge(m, src)
|
||||
}
|
||||
func (m *Tag) XXX_Size() int {
|
||||
return xxx_messageInfo_Tag.Size(m)
|
||||
}
|
||||
func (m *Tag) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_Tag.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_Tag proto.InternalMessageInfo
|
||||
|
||||
func (m *Tag) GetKey() string {
|
||||
if m != nil && m.Key != nil {
|
||||
return *m.Key
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Tag) GetValue() string {
|
||||
if m != nil && m.Value != nil {
|
||||
return *m.Value
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type Record struct {
|
||||
PartitionKeyIndex *uint64 `protobuf:"varint,1,req,name=partition_key_index,json=partitionKeyIndex" json:"partition_key_index,omitempty"`
|
||||
ExplicitHashKeyIndex *uint64 `protobuf:"varint,2,opt,name=explicit_hash_key_index,json=explicitHashKeyIndex" json:"explicit_hash_key_index,omitempty"`
|
||||
Data []byte `protobuf:"bytes,3,req,name=data" json:"data,omitempty"`
|
||||
Tags []*Tag `protobuf:"bytes,4,rep,name=tags" json:"tags,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Record) Reset() { *m = Record{} }
|
||||
func (m *Record) String() string { return proto.CompactTextString(m) }
|
||||
func (*Record) ProtoMessage() {}
|
||||
func (*Record) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_6ae0159314830e16, []int{2}
|
||||
}
|
||||
|
||||
func (m *Record) XXX_Unmarshal(b []byte) error {
|
||||
return xxx_messageInfo_Record.Unmarshal(m, b)
|
||||
}
|
||||
func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
return xxx_messageInfo_Record.Marshal(b, m, deterministic)
|
||||
}
|
||||
func (m *Record) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_Record.Merge(m, src)
|
||||
}
|
||||
func (m *Record) XXX_Size() int {
|
||||
return xxx_messageInfo_Record.Size(m)
|
||||
}
|
||||
func (m *Record) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_Record.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_Record proto.InternalMessageInfo
|
||||
|
||||
func (m *Record) GetPartitionKeyIndex() uint64 {
|
||||
if m != nil && m.PartitionKeyIndex != nil {
|
||||
return *m.PartitionKeyIndex
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Record) GetExplicitHashKeyIndex() uint64 {
|
||||
if m != nil && m.ExplicitHashKeyIndex != nil {
|
||||
return *m.ExplicitHashKeyIndex
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Record) GetData() []byte {
|
||||
if m != nil {
|
||||
return m.Data
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Record) GetTags() []*Tag {
|
||||
if m != nil {
|
||||
return m.Tags
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*AggregatedRecord)(nil), "AggregatedRecord")
|
||||
proto.RegisterType((*Tag)(nil), "Tag")
|
||||
proto.RegisterType((*Record)(nil), "Record")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("records.proto", fileDescriptor_6ae0159314830e16) }
|
||||
|
||||
var fileDescriptor_6ae0159314830e16 = []byte{
|
||||
// 245 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x8f, 0xc1, 0x4a, 0xc4, 0x30,
|
||||
0x10, 0x86, 0xc9, 0x26, 0xba, 0x74, 0x54, 0x58, 0xe3, 0x82, 0x39, 0xd6, 0x9e, 0x72, 0xb1, 0x07,
|
||||
0xc1, 0x07, 0xf0, 0xa6, 0x78, 0x0b, 0xbd, 0x97, 0x71, 0x3b, 0xa4, 0x61, 0xcb, 0xb6, 0xa4, 0x51,
|
||||
0xb6, 0xef, 0xa2, 0xef, 0x2a, 0x49, 0xdd, 0x45, 0x51, 0x6f, 0x93, 0xf9, 0xf9, 0x32, 0xff, 0x07,
|
||||
0x17, 0x9e, 0x36, 0xbd, 0x6f, 0xc6, 0x72, 0xf0, 0x7d, 0xe8, 0x8b, 0x77, 0x06, 0xab, 0x07, 0x6b,
|
||||
0x3d, 0x59, 0x0c, 0xd4, 0x98, 0x94, 0xc9, 0x12, 0xae, 0x06, 0xf4, 0xc1, 0x05, 0xd7, 0xef, 0xea,
|
||||
0x2d, 0x4d, 0x75, 0xc0, 0x97, 0x8e, 0x14, 0xcb, 0xb9, 0xce, 0xcc, 0xe5, 0x31, 0x7a, 0xa6, 0xa9,
|
||||
0x8a, 0x81, 0xbc, 0x87, 0x6b, 0xda, 0x0f, 0x9d, 0xdb, 0xb8, 0x50, 0xb7, 0x38, 0xb6, 0xdf, 0x98,
|
||||
0x45, 0x62, 0xd6, 0x87, 0xf8, 0x11, 0xc7, 0xf6, 0x88, 0xdd, 0xc0, 0xf2, 0xab, 0x8c, 0xe2, 0x39,
|
||||
0xd7, 0x67, 0x77, 0xcb, 0x72, 0x2e, 0x60, 0x0e, 0xfb, 0xe2, 0x16, 0x78, 0x85, 0x56, 0xae, 0x80,
|
||||
0x6f, 0x69, 0x52, 0x2c, 0x5f, 0xe8, 0xcc, 0xc4, 0x51, 0xae, 0xe1, 0xe4, 0x0d, 0xbb, 0xd7, 0x78,
|
||||
0x80, 0xe9, 0xcc, 0xcc, 0x8f, 0xe2, 0x83, 0xc1, 0xe9, 0x7f, 0x0e, 0x6e, 0xd7, 0xd0, 0x3e, 0x7d,
|
||||
0x21, 0x7e, 0x3a, 0x3c, 0xc5, 0xe0, 0x6f, 0x87, 0x99, 0x89, 0x27, 0xc4, 0x2f, 0x87, 0x19, 0x93,
|
||||
0x20, 0x1a, 0x0c, 0xa8, 0x78, 0xbe, 0xd0, 0xe7, 0x26, 0xcd, 0x52, 0x81, 0x08, 0x68, 0x47, 0x25,
|
||||
0x92, 0x94, 0x28, 0x2b, 0xb4, 0x26, 0x6d, 0x3e, 0x03, 0x00, 0x00, 0xff, 0xff, 0x87, 0x3e, 0x63,
|
||||
0x69, 0x7d, 0x01, 0x00, 0x00,
|
||||
}
|
||||
|
|
@ -29,24 +29,23 @@ import (
|
|||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/aws/retry"
|
||||
awsConfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||
rec "github.com/awslabs/kinesis-aggregation/go/v2/records"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/utils"
|
||||
rec "github.com/vmware/vmware-go-kcl-v2/internal/records"
|
||||
)
|
||||
|
||||
const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}`
|
||||
|
||||
// NewKinesisClient to create a Kinesis Client.
|
||||
func NewKinesisClient(t *testing.T, regionName, endpoint string, creds *credentials.StaticCredentialsProvider) *kinesis.Client {
|
||||
func NewKinesisClient(t *testing.T, regionName, endpoint string, creds aws.CredentialsProvider) *kinesis.Client {
|
||||
// create session for Kinesis
|
||||
t.Logf("Creating Kinesis client")
|
||||
|
||||
resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
|
||||
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
URL: endpoint,
|
||||
|
|
@ -57,12 +56,8 @@ func NewKinesisClient(t *testing.T, regionName, endpoint string, creds *credenti
|
|||
cfg, err := awsConfig.LoadDefaultConfig(
|
||||
context.TODO(),
|
||||
awsConfig.WithRegion(regionName),
|
||||
awsConfig.WithCredentialsProvider(
|
||||
credentials.NewStaticCredentialsProvider(
|
||||
creds.Value.AccessKeyID,
|
||||
creds.Value.SecretAccessKey,
|
||||
creds.Value.SessionToken)),
|
||||
awsConfig.WithEndpointResolver(resolver),
|
||||
awsConfig.WithCredentialsProvider(creds),
|
||||
awsConfig.WithEndpointResolverWithOptions(resolver),
|
||||
awsConfig.WithRetryer(func() aws.Retryer {
|
||||
return retry.AddWithMaxBackoffDelay(retry.NewStandard(), retry.DefaultMaxBackoff)
|
||||
}),
|
||||
|
|
@ -77,8 +72,8 @@ func NewKinesisClient(t *testing.T, regionName, endpoint string, creds *credenti
|
|||
}
|
||||
|
||||
// NewDynamoDBClient to create a Kinesis Client.
|
||||
func NewDynamoDBClient(t *testing.T, regionName, endpoint string, creds *credentials.StaticCredentialsProvider) *dynamodb.Client {
|
||||
resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
|
||||
func NewDynamoDBClient(t *testing.T, regionName, endpoint string, creds aws.CredentialsProvider) *dynamodb.Client {
|
||||
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
URL: endpoint,
|
||||
|
|
@ -89,12 +84,8 @@ func NewDynamoDBClient(t *testing.T, regionName, endpoint string, creds *credent
|
|||
cfg, err := awsConfig.LoadDefaultConfig(
|
||||
context.TODO(),
|
||||
awsConfig.WithRegion(regionName),
|
||||
awsConfig.WithCredentialsProvider(
|
||||
credentials.NewStaticCredentialsProvider(
|
||||
creds.Value.AccessKeyID,
|
||||
creds.Value.SecretAccessKey,
|
||||
creds.Value.SessionToken)),
|
||||
awsConfig.WithEndpointResolver(resolver),
|
||||
awsConfig.WithCredentialsProvider(creds),
|
||||
awsConfig.WithEndpointResolverWithOptions(resolver),
|
||||
awsConfig.WithRetryer(func() aws.Retryer {
|
||||
return retry.AddWithMaxBackoffDelay(retry.NewStandard(), retry.DefaultMaxBackoff)
|
||||
}),
|
||||
|
|
|
|||
Loading…
Reference in a new issue