From 499e9cf1bedaea2bca8889e8248aecca39104cf6 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Fri, 18 Dec 2020 16:52:04 -0600 Subject: [PATCH] Update aws go sdk and tests (#81) Update aws go sdk to the latest. Also, update integration tests by publishing data using both PutRecord and PutRecords. Signed-off-by: Tao Jiang --- clientlibrary/worker/worker.go | 14 ----- go.mod | 5 +- go.sum | 21 +++++--- test/record_processor_test.go | 88 ++++++++++++++++++++++++++++++ test/record_publisher_test.go | 99 ++++++++++++++++++++++++++++++++++ test/worker_custom_test.go | 26 ++------- test/worker_test.go | 77 +++----------------------- 7 files changed, 212 insertions(+), 118 deletions(-) create mode 100644 test/record_processor_test.go create mode 100644 test/record_publisher_test.go diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 2fc3ab2..32273c0 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -147,20 +147,6 @@ func (w *Worker) Shutdown() { log.Infof("Worker loop is complete. Exiting from worker.") } -// Publish to write some data into stream. This function is mainly used for testing purpose. -func (w *Worker) Publish(streamName, partitionKey string, data []byte) error { - log := w.kclConfig.Logger - _, err := w.kc.PutRecord(&kinesis.PutRecordInput{ - Data: data, - StreamName: aws.String(streamName), - PartitionKey: aws.String(partitionKey), - }) - if err != nil { - log.Errorf("Error in publishing data to %s/%s. Error: %+v", streamName, partitionKey, err) - } - return err -} - // initialize func (w *Worker) initialize() error { log := w.kclConfig.Logger diff --git a/go.mod b/go.mod index 9a4ec48..84de9ab 100644 --- a/go.mod +++ b/go.mod @@ -2,18 +2,17 @@ module github.com/vmware/vmware-go-kcl require ( github.com/BurntSushi/toml v0.3.1 // indirect - github.com/aws/aws-sdk-go v1.19.38 + github.com/aws/aws-sdk-go v1.34.8 github.com/google/uuid v1.1.1 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/prometheus/client_golang v0.9.3 github.com/prometheus/common v0.4.1 github.com/prometheus/procfs v0.0.0-20190523193104-a7aeb8df3389 // indirect github.com/sirupsen/logrus v1.4.2 - github.com/stretchr/testify v1.3.0 + github.com/stretchr/testify v1.5.1 go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.2.0 // indirect go.uber.org/zap v1.11.0 - golang.org/x/net v0.0.0-20190522155817-f3200d17e092 // indirect golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4 // indirect golang.org/x/text v0.3.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/go.sum b/go.sum index 19dc05c..d6c3fa2 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/aws/aws-sdk-go v1.19.38 h1:WKjobgPO4Ua1ww2NJJl2/zQNreUZxvqmEzwMlRjjm9g= -github.com/aws/aws-sdk-go v1.19.38/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-sdk-go v1.34.8 h1:GDfVeXG8XQDbpOeAj7415F8qCQZwvY/k/fj+HBqUnBA= +github.com/aws/aws-sdk-go v1.34.8/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -16,6 +16,7 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8 github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -23,8 +24,8 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= -github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= @@ -36,6 +37,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -60,8 +63,8 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4= @@ -71,8 +74,8 @@ go.uber.org/zap v1.11.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco= -golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -93,3 +96,5 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXL gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/test/record_processor_test.go b/test/record_processor_test.go new file mode 100644 index 0000000..6c93d6c --- /dev/null +++ b/test/record_processor_test.go @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2020 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +package test + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/stretchr/testify/assert" + kc "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" + "testing" +) + +// Record processor factory is used to create RecordProcessor +func recordProcessorFactory(t *testing.T) kc.IRecordProcessorFactory { + return &dumpRecordProcessorFactory{t: t} +} + +// simple record processor and dump everything +type dumpRecordProcessorFactory struct { + t *testing.T +} + +func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor { + return &dumpRecordProcessor{ + t: d.t, + } +} + +// Create a dump record processor for printing out all data from record. +type dumpRecordProcessor struct { + t *testing.T + count int +} + +func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) { + dd.t.Logf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) + shardID = input.ShardId + dd.count = 0 +} + +func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { + dd.t.Log("Processing Records...") + + // don't process empty record + if len(input.Records) == 0 { + return + } + + for _, v := range input.Records { + dd.t.Logf("Record = %s", v.Data) + assert.Equal(dd.t, specstr, string(v.Data)) + dd.count++ + } + + // checkpoint it after processing this batch + lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber + dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNubmer, input.MillisBehindLatest) + input.Checkpointer.Checkpoint(lastRecordSequenceNubmer) +} + +func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) { + dd.t.Logf("Shutdown Reason: %v", aws.StringValue(kc.ShutdownReasonMessage(input.ShutdownReason))) + dd.t.Logf("Processed Record Count = %d", dd.count) + + // When the value of {@link ShutdownInput#getShutdownReason()} is + // {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you + // checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. + if input.ShutdownReason == kc.TERMINATE { + input.Checkpointer.Checkpoint(nil) + } + + assert.True(dd.t, dd.count > 0) +} diff --git a/test/record_publisher_test.go b/test/record_publisher_test.go new file mode 100644 index 0000000..1bddbf2 --- /dev/null +++ b/test/record_publisher_test.go @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2020 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +package test + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" + "github.com/vmware/vmware-go-kcl/clientlibrary/utils" + "testing" +) + +const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}` + +// NewKinesisClient to create a Kinesis Client. +func NewKinesisClient(t *testing.T, regionName, endpoint string, credentials *credentials.Credentials) *kinesis.Kinesis{ + s, err := session.NewSession(&aws.Config{ + Region: aws.String(regionName), + Endpoint: aws.String(endpoint), + Credentials: credentials, + }) + + if err != nil { + // no need to move forward + t.Fatalf("Failed in getting Kinesis session for creating Worker: %+v", err) + } + return kinesis.New(s) +} + +// publishSomeData to put some records into Kinesis stream +func publishSomeData(t *testing.T, kc kinesisiface.KinesisAPI) { + // Put some data into stream. + t.Log("Putting data into stream using PutRecord API...") + for i := 0; i < 50; i++ { + publishRecord(t, kc) + } + t.Log("Done putting data into stream using PutRecord API.") + + // Put some data into stream using PutRecords API + t.Log("Putting data into stream using PutRecords API...") + for i := 0; i < 10; i++ { + publishRecords(t, kc) + } + t.Log("Done putting data into stream using PutRecords API.") +} + +// publishRecord to put a record into Kinesis stream using PutRecord API. +func publishRecord(t *testing.T, kc kinesisiface.KinesisAPI) { + // Use random string as partition key to ensure even distribution across shards + _, err := kc.PutRecord(&kinesis.PutRecordInput{ + Data: []byte(specstr), + StreamName: aws.String(streamName), + PartitionKey: aws.String(utils.RandStringBytesMaskImpr(10)), + }) + + if err != nil { + t.Errorf("Error in PutRecord. %+v", err) + } +} + +// publishRecord to put a record into Kinesis stream using PutRecords API. +func publishRecords(t *testing.T, kc kinesisiface.KinesisAPI) { + // Use random string as partition key to ensure even distribution across shards + records := make([]*kinesis.PutRecordsRequestEntry, 5) + + for i:= 0; i < 5; i++ { + records[i] = &kinesis.PutRecordsRequestEntry{ + Data: []byte(specstr), + PartitionKey: aws.String(utils.RandStringBytesMaskImpr(10)), + } + } + + _, err := kc.PutRecords(&kinesis.PutRecordsInput{ + Records: records, + StreamName: aws.String(streamName), + }) + + if err != nil { + t.Errorf("Error in PutRecords. %+v", err) + } +} \ No newline at end of file diff --git a/test/worker_custom_test.go b/test/worker_custom_test.go index 46f21be..85330c1 100644 --- a/test/worker_custom_test.go +++ b/test/worker_custom_test.go @@ -34,7 +34,6 @@ import ( chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint" cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config" par "github.com/vmware/vmware-go-kcl/clientlibrary/partition" - "github.com/vmware/vmware-go-kcl/clientlibrary/utils" wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker" ) @@ -65,13 +64,8 @@ func TestWorkerInjectCheckpointer(t *testing.T) { assert.Nil(t, err) // Put some data into stream. - for i := 0; i < 100; i++ { - // Use random string as partition key to ensure even distribution across shards - err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr)) - if err != nil { - t.Errorf("Errorin Publish. %+v", err) - } - } + kc := NewKinesisClient(t, regionName, kclConfig.KinesisEndpoint, kclConfig.KinesisCredentials) + publishSomeData(t, kc) // wait a few seconds before shutdown processing time.Sleep(10 * time.Second) @@ -124,13 +118,7 @@ func TestWorkerInjectKinesis(t *testing.T) { assert.Nil(t, err) // Put some data into stream. - for i := 0; i < 100; i++ { - // Use random string as partition key to ensure even distribution across shards - err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr)) - if err != nil { - t.Errorf("Errorin Publish. %+v", err) - } - } + publishSomeData(t, kc) // wait a few seconds before shutdown processing time.Sleep(10 * time.Second) @@ -173,13 +161,7 @@ func TestWorkerInjectKinesisAndCheckpointer(t *testing.T) { assert.Nil(t, err) // Put some data into stream. - for i := 0; i < 100; i++ { - // Use random string as partition key to ensure even distribution across shards - err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr)) - if err != nil { - t.Errorf("Errorin Publish. %+v", err) - } - } + publishSomeData(t, kc) // wait a few seconds before shutdown processing time.Sleep(10 * time.Second) diff --git a/test/worker_test.go b/test/worker_test.go index 7cf4f87..73f6cdf 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -26,18 +26,15 @@ import ( "testing" "time" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/session" "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/assert" cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config" - kc "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" "github.com/vmware/vmware-go-kcl/clientlibrary/metrics" "github.com/vmware/vmware-go-kcl/clientlibrary/metrics/cloudwatch" "github.com/vmware/vmware-go-kcl/clientlibrary/metrics/prometheus" - "github.com/vmware/vmware-go-kcl/clientlibrary/utils" wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker" "github.com/vmware/vmware-go-kcl/logger" zaplogger "github.com/vmware/vmware-go-kcl/logger/zap" @@ -49,7 +46,6 @@ const ( workerID = "test-worker" ) -const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}` const metricsSystem = "cloudwatch" var shardID string @@ -137,7 +133,9 @@ func TestWorkerWithSigInt(t *testing.T) { func TestWorkerStatic(t *testing.T) { t.Skip("Need to provide actual credentials") - creds := credentials.NewStaticCredentials("AccessKeyId", "SecretAccessKey", "") + // Fill in the credentials for accessing Kinesis and DynamoDB. + // Note: use empty string as SessionToken for long-term credentials. + creds := credentials.NewStaticCredentials("AccessKeyId", "SecretAccessKey", "SessionToken") kclConfig := cfg.NewKinesisClientLibConfigWithCredential("appName", streamName, regionName, workerID, creds). WithInitialPositionInStream(cfg.LATEST). @@ -196,15 +194,8 @@ func runTest(kclConfig *cfg.KinesisClientLibConfiguration, triggersig bool, t *t }() // Put some data into stream. - t.Log("Putting data into stream.") - for i := 0; i < 100; i++ { - // Use random string as partition key to ensure even distribution across shards - err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr)) - if err != nil { - t.Errorf("Errorin Publish. %+v", err) - } - } - t.Log("Done putting data into stream.") + kc := NewKinesisClient(t, regionName, kclConfig.KinesisEndpoint, kclConfig.KinesisCredentials) + publishSomeData(t, kc) if triggersig { t.Log("Trigger signal SIGINT") @@ -250,60 +241,4 @@ func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service stri } return nil -} - -// Record processor factory is used to create RecordProcessor -func recordProcessorFactory(t *testing.T) kc.IRecordProcessorFactory { - return &dumpRecordProcessorFactory{t: t} -} - -// simple record processor and dump everything -type dumpRecordProcessorFactory struct { - t *testing.T -} - -func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor { - return &dumpRecordProcessor{ - t: d.t, - } -} - -// Create a dump record processor for printing out all data from record. -type dumpRecordProcessor struct { - t *testing.T -} - -func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) { - dd.t.Logf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) - shardID = input.ShardId -} - -func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { - dd.t.Log("Processing Records...") - - // don't process empty record - if len(input.Records) == 0 { - return - } - - for _, v := range input.Records { - dd.t.Logf("Record = %s", v.Data) - assert.Equal(dd.t, specstr, string(v.Data)) - } - - // checkpoint it after processing this batch - lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber - dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNubmer, input.MillisBehindLatest) - input.Checkpointer.Checkpoint(lastRecordSequenceNubmer) -} - -func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) { - dd.t.Logf("Shutdown Reason: %v", aws.StringValue(kc.ShutdownReasonMessage(input.ShutdownReason))) - - // When the value of {@link ShutdownInput#getShutdownReason()} is - // {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you - // checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. - if input.ShutdownReason == kc.TERMINATE { - input.Checkpointer.Checkpoint(nil) - } -} +} \ No newline at end of file