2018-08-17 13:03:25 +00:00
|
|
|
/*
|
|
|
|
|
* Copyright (c) 2018 VMware, Inc.
|
|
|
|
|
*
|
|
|
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
|
|
|
|
|
* associated documentation files (the "Software"), to deal in the Software without restriction, including
|
|
|
|
|
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
|
|
|
* copies of the Software, and to permit persons to whom the Software is furnished to do
|
|
|
|
|
* so, subject to the following conditions:
|
|
|
|
|
*
|
|
|
|
|
* The above copyright notice and this permission notice shall be included in all copies or substantial
|
|
|
|
|
* portions of the Software.
|
|
|
|
|
*
|
|
|
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
|
|
|
|
|
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
|
|
|
|
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
|
|
|
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
|
|
|
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
|
|
|
*/
|
2018-04-17 16:25:41 +00:00
|
|
|
package worker
|
|
|
|
|
|
|
|
|
|
import (
|
2018-04-20 15:30:24 +00:00
|
|
|
"net/http"
|
2018-04-17 16:25:41 +00:00
|
|
|
"os"
|
|
|
|
|
"testing"
|
|
|
|
|
"time"
|
|
|
|
|
|
2018-04-18 22:50:15 +00:00
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
2018-04-20 15:30:24 +00:00
|
|
|
"github.com/prometheus/common/expfmt"
|
2018-04-17 16:25:41 +00:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
|
|
|
|
|
|
"github.com/stretchr/testify/assert"
|
2018-04-22 14:09:16 +00:00
|
|
|
cfg "vmware.com/cascade-kinesis-client/clientlibrary/config"
|
|
|
|
|
kc "vmware.com/cascade-kinesis-client/clientlibrary/interfaces"
|
|
|
|
|
"vmware.com/cascade-kinesis-client/clientlibrary/metrics"
|
|
|
|
|
"vmware.com/cascade-kinesis-client/clientlibrary/utils"
|
2018-04-17 16:25:41 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
streamName = "kcl-test"
|
|
|
|
|
regionName = "us-west-2"
|
|
|
|
|
workerID = "test-worker"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}`
|
2018-04-20 15:30:24 +00:00
|
|
|
const metricsSystem = "cloudwatch"
|
2018-04-17 16:25:41 +00:00
|
|
|
|
|
|
|
|
func TestWorker(t *testing.T) {
|
|
|
|
|
kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
|
|
|
|
|
WithInitialPositionInStream(cfg.LATEST).
|
2018-04-18 22:50:15 +00:00
|
|
|
WithMaxRecords(10).
|
2018-04-17 16:25:41 +00:00
|
|
|
WithMaxLeasesForWorker(1).
|
2018-04-18 22:50:15 +00:00
|
|
|
WithShardSyncIntervalMillis(5000).
|
|
|
|
|
WithFailoverTimeMillis(300000)
|
2018-04-17 16:25:41 +00:00
|
|
|
|
|
|
|
|
log.SetOutput(os.Stdout)
|
|
|
|
|
log.SetLevel(log.DebugLevel)
|
|
|
|
|
|
|
|
|
|
assert.Equal(t, regionName, kclConfig.RegionName)
|
|
|
|
|
assert.Equal(t, streamName, kclConfig.StreamName)
|
|
|
|
|
|
2018-04-20 15:30:24 +00:00
|
|
|
// configure cloudwatch as metrics system
|
|
|
|
|
metricsConfig := getMetricsConfig(metricsSystem)
|
|
|
|
|
|
|
|
|
|
worker := NewWorker(recordProcessorFactory(t), kclConfig, metricsConfig)
|
2018-04-17 16:25:41 +00:00
|
|
|
assert.Equal(t, regionName, worker.regionName)
|
|
|
|
|
assert.Equal(t, streamName, worker.streamName)
|
|
|
|
|
|
|
|
|
|
err := worker.Start()
|
|
|
|
|
assert.Nil(t, err)
|
|
|
|
|
|
|
|
|
|
// Put some data into stream.
|
|
|
|
|
for i := 0; i < 100; i++ {
|
|
|
|
|
// Use random string as partition key to ensure even distribution across shards
|
|
|
|
|
err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr))
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Errorf("Errorin Publish. %+v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-04-20 15:30:24 +00:00
|
|
|
// wait a few seconds before shutdown processing
|
2018-04-17 16:25:41 +00:00
|
|
|
time.Sleep(10 * time.Second)
|
2018-04-20 15:30:24 +00:00
|
|
|
|
|
|
|
|
if metricsConfig != nil && metricsConfig.MonitoringService == "prometheus" {
|
|
|
|
|
res, err := http.Get("http://localhost:8080/metrics")
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("Error scraping Prometheus endpoint %s", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var parser expfmt.TextParser
|
|
|
|
|
parsed, err := parser.TextToMetricFamilies(res.Body)
|
|
|
|
|
res.Body.Close()
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Errorf("Error reading monitoring response %s", err)
|
|
|
|
|
}
|
|
|
|
|
t.Logf("Prometheus: %+v", parsed)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2018-04-17 16:25:41 +00:00
|
|
|
worker.Shutdown()
|
|
|
|
|
}
|
|
|
|
|
|
2018-04-20 15:30:24 +00:00
|
|
|
// configure different metrics system
|
|
|
|
|
func getMetricsConfig(service string) *metrics.MonitoringConfiguration {
|
|
|
|
|
if service == "cloudwatch" {
|
|
|
|
|
return &metrics.MonitoringConfiguration{
|
|
|
|
|
MonitoringService: "cloudwatch",
|
|
|
|
|
Region: regionName,
|
|
|
|
|
CloudWatch: metrics.CloudWatchMonitoringService{
|
2018-04-21 04:07:11 +00:00
|
|
|
// Those value should come from kclConfig
|
|
|
|
|
MetricsBufferTimeMillis: 10000,
|
|
|
|
|
MetricsMaxQueueSize: 20,
|
2018-04-20 15:30:24 +00:00
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if service == "prometheus" {
|
|
|
|
|
return &metrics.MonitoringConfiguration{
|
|
|
|
|
MonitoringService: "prometheus",
|
|
|
|
|
Region: regionName,
|
|
|
|
|
Prometheus: metrics.PrometheusMonitoringService{
|
|
|
|
|
ListenAddress: ":8080",
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2018-04-17 16:25:41 +00:00
|
|
|
// Record processor factory is used to create RecordProcessor
|
|
|
|
|
func recordProcessorFactory(t *testing.T) kc.IRecordProcessorFactory {
|
|
|
|
|
return &dumpRecordProcessorFactory{t: t}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// simple record processor and dump everything
|
|
|
|
|
type dumpRecordProcessorFactory struct {
|
|
|
|
|
t *testing.T
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor {
|
|
|
|
|
return &dumpRecordProcessor{
|
|
|
|
|
t: d.t,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create a dump record processor for printing out all data from record.
|
|
|
|
|
type dumpRecordProcessor struct {
|
|
|
|
|
t *testing.T
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) {
|
2018-04-18 22:50:15 +00:00
|
|
|
dd.t.Logf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber))
|
2018-04-17 16:25:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
|
|
|
|
|
dd.t.Log("Processing Records...")
|
|
|
|
|
|
|
|
|
|
// don't process empty record
|
|
|
|
|
if len(input.Records) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, v := range input.Records {
|
|
|
|
|
dd.t.Logf("Record = %s", v.Data)
|
|
|
|
|
assert.Equal(dd.t, specstr, string(v.Data))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// checkpoint it after processing this batch
|
|
|
|
|
lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber
|
2018-04-18 22:50:15 +00:00
|
|
|
dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNubmer, input.MillisBehindLatest)
|
2018-04-17 16:25:41 +00:00
|
|
|
input.Checkpointer.Checkpoint(lastRecordSequenceNubmer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) {
|
2018-04-18 22:50:15 +00:00
|
|
|
dd.t.Logf("Shutdown Reason: %v", aws.StringValue(kc.ShutdownReasonMessage(input.ShutdownReason)))
|
2018-08-07 03:49:15 +00:00
|
|
|
|
|
|
|
|
// When the value of {@link ShutdownInput#getShutdownReason()} is
|
|
|
|
|
// {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you
|
|
|
|
|
// checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
|
|
|
|
|
if input.ShutdownReason == kc.TERMINATE {
|
|
|
|
|
input.Checkpointer.Checkpoint(nil)
|
|
|
|
|
}
|
2018-04-17 16:25:41 +00:00
|
|
|
}
|