349 lines
11 KiB
Go
349 lines
11 KiB
Go
/*
|
|
* Copyright (c) 2018 VMware, Inc.
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
|
|
* associated documentation files (the "Software"), to deal in the Software without restriction, including
|
|
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
* copies of the Software, and to permit persons to whom the Software is furnished to do
|
|
* so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in all copies or substantial
|
|
* portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
|
|
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
|
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
*/
|
|
package test
|
|
|
|
import (
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/prometheus/common/expfmt"
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
cfg "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
|
|
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
|
|
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics/cloudwatch"
|
|
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics/prometheus"
|
|
wk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"
|
|
"github.com/vmware/vmware-go-kcl-v2/logger"
|
|
zaplogger "github.com/vmware/vmware-go-kcl-v2/logger/zap"
|
|
)
|
|
|
|
const (
|
|
appName = "appName"
|
|
streamName = "kcl-test"
|
|
regionName = "us-west-2"
|
|
workerID = "test-worker"
|
|
consumerName = "enhanced-fan-out-consumer"
|
|
kinesisEndpoint = "https://kinesis.eu-west-1.amazonaws.com"
|
|
dynamoEndpoint = "https://dynamodb.eu-west-1.amazonaws.com"
|
|
)
|
|
|
|
const metricsSystem = "cloudwatch"
|
|
|
|
var shardID string
|
|
|
|
func TestWorker(t *testing.T) {
|
|
// At minimal. use standard logrus logger
|
|
// log := logger.NewLogrusLogger(logrus.StandardLogger())
|
|
//
|
|
// In order to have precise control over logging. Use logger with config
|
|
config := logger.Configuration{
|
|
EnableConsole: true,
|
|
ConsoleLevel: logger.Error,
|
|
ConsoleJSONFormat: false,
|
|
EnableFile: true,
|
|
FileLevel: logger.Info,
|
|
FileJSONFormat: true,
|
|
Filename: "log.log",
|
|
}
|
|
// Use logrus logger
|
|
log := logger.NewLogrusLoggerWithConfig(config)
|
|
|
|
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
|
|
WithInitialPositionInStream(cfg.LATEST).
|
|
WithMaxRecords(8).
|
|
WithMaxLeasesForWorker(1).
|
|
WithShardSyncIntervalMillis(5000).
|
|
WithFailoverTimeMillis(300000).
|
|
WithLogger(log).
|
|
WithKinesisEndpoint(kinesisEndpoint)
|
|
|
|
runTest(kclConfig, false, t)
|
|
}
|
|
|
|
func TestWorkerWithTimestamp(t *testing.T) {
|
|
// In order to have precise control over logging. Use logger with config
|
|
config := logger.Configuration{
|
|
EnableConsole: true,
|
|
ConsoleLevel: logger.Debug,
|
|
ConsoleJSONFormat: false,
|
|
}
|
|
// Use logrus logger
|
|
log := logger.NewLogrusLoggerWithConfig(config)
|
|
|
|
ts := time.Now().Add(time.Second * 5)
|
|
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
|
|
WithTimestampAtInitialPositionInStream(&ts).
|
|
WithMaxRecords(10).
|
|
WithMaxLeasesForWorker(1).
|
|
WithShardSyncIntervalMillis(5000).
|
|
WithFailoverTimeMillis(300000).
|
|
WithLogger(log).
|
|
WithKinesisEndpoint(kinesisEndpoint)
|
|
|
|
runTest(kclConfig, false, t)
|
|
}
|
|
|
|
func TestWorkerWithSigInt(t *testing.T) {
|
|
// At miminal. use standard zap logger
|
|
//zapLogger, err := zap.NewProduction()
|
|
//assert.Nil(t, err)
|
|
//log := zaplogger.NewZapLogger(zapLogger.Sugar())
|
|
//
|
|
// In order to have precise control over logging. Use logger with config.
|
|
config := logger.Configuration{
|
|
EnableConsole: true,
|
|
ConsoleLevel: logger.Debug,
|
|
ConsoleJSONFormat: true,
|
|
EnableFile: true,
|
|
FileLevel: logger.Info,
|
|
FileJSONFormat: true,
|
|
Filename: "log.log",
|
|
}
|
|
// use zap logger
|
|
log := zaplogger.NewZapLoggerWithConfig(config)
|
|
|
|
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
|
|
WithInitialPositionInStream(cfg.LATEST).
|
|
WithMaxRecords(10).
|
|
WithMaxLeasesForWorker(1).
|
|
WithShardSyncIntervalMillis(5000).
|
|
WithFailoverTimeMillis(300000).
|
|
WithLogger(log).
|
|
WithKinesisEndpoint(kinesisEndpoint)
|
|
|
|
runTest(kclConfig, true, t)
|
|
}
|
|
|
|
func TestWorkerStatic(t *testing.T) {
|
|
//t.Skip("Need to provide actual credentials")
|
|
|
|
// Fill in the credentials for accessing Kinesis and DynamoDB.
|
|
// Note: use empty string as SessionToken for long-term credentials.
|
|
kinesisCreds := credentials.NewStaticCredentialsProvider("", "", "")
|
|
dynamoCreds := credentials.NewStaticCredentialsProvider("", "", "")
|
|
|
|
kclConfig := cfg.NewKinesisClientLibConfigWithCredentials(appName, streamName, regionName, workerID, &kinesisCreds, &dynamoCreds).
|
|
WithInitialPositionInStream(cfg.LATEST).
|
|
WithMaxRecords(10).
|
|
WithMaxLeasesForWorker(1).
|
|
WithShardSyncIntervalMillis(5000).
|
|
WithFailoverTimeMillis(300000).
|
|
WithKinesisEndpoint(kinesisEndpoint).
|
|
WithDynamoDBEndpoint(dynamoEndpoint)
|
|
|
|
runTest(kclConfig, false, t)
|
|
}
|
|
|
|
func TestWorkerAssumeRole(t *testing.T) {
|
|
t.Skip("Need to provide actual roleARN")
|
|
|
|
// Initial credentials loaded from SDK's default credential chain. Such as
|
|
// the environment, shared credentials (~/.aws/credentials), or EC2 Instance
|
|
// Role. These credentials will be used to make the STS Assume Role API.
|
|
//sess := session.Must(session.NewSession())
|
|
|
|
// Create the credentials from AssumeRoleProvider to assume the role
|
|
// referenced by the "myRoleARN" ARN.
|
|
//kinesisCreds := stscreds.NewAssumeRoleProvider(sess, "arn:aws:iam::*:role/kcl-test-publisher")
|
|
kinesisCreds := credentials.NewStaticCredentialsProvider("", "", "")
|
|
dynamoCreds := credentials.NewStaticCredentialsProvider("", "", "")
|
|
|
|
kclConfig := cfg.NewKinesisClientLibConfigWithCredentials(appName, streamName, regionName, workerID, &kinesisCreds, &dynamoCreds).
|
|
WithInitialPositionInStream(cfg.LATEST).
|
|
WithMaxRecords(10).
|
|
WithMaxLeasesForWorker(1).
|
|
WithShardSyncIntervalMillis(5000).
|
|
WithFailoverTimeMillis(300000).
|
|
WithKinesisEndpoint(kinesisEndpoint).
|
|
WithDynamoDBEndpoint(dynamoEndpoint)
|
|
|
|
runTest(kclConfig, false, t)
|
|
}
|
|
|
|
func TestEnhancedFanOutConsumer(t *testing.T) {
|
|
// At minimal, use standard logrus logger
|
|
// log := logger.NewLogrusLogger(logrus.StandardLogger())
|
|
//
|
|
// In order to have precise control over logging. Use logger with config
|
|
config := logger.Configuration{
|
|
EnableConsole: true,
|
|
ConsoleLevel: logger.Debug,
|
|
ConsoleJSONFormat: false,
|
|
EnableFile: true,
|
|
FileLevel: logger.Info,
|
|
FileJSONFormat: true,
|
|
Filename: "log.log",
|
|
}
|
|
// Use logrus logger
|
|
log := logger.NewLogrusLoggerWithConfig(config)
|
|
|
|
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
|
|
WithInitialPositionInStream(cfg.LATEST).
|
|
WithEnhancedFanOutConsumerName(consumerName).
|
|
WithMaxRecords(10).
|
|
WithMaxLeasesForWorker(1).
|
|
WithShardSyncIntervalMillis(5000).
|
|
WithFailoverTimeMillis(300000).
|
|
WithLogger(log)
|
|
|
|
runTest(kclConfig, false, t)
|
|
}
|
|
|
|
func TestEnhancedFanOutConsumerDefaultConsumerName(t *testing.T) {
|
|
// At minimal, use standard logrus logger
|
|
// log := logger.NewLogrusLogger(logrus.StandardLogger())
|
|
//
|
|
// In order to have precise control over logging. Use logger with config
|
|
config := logger.Configuration{
|
|
EnableConsole: true,
|
|
ConsoleLevel: logger.Debug,
|
|
ConsoleJSONFormat: false,
|
|
EnableFile: true,
|
|
FileLevel: logger.Info,
|
|
FileJSONFormat: true,
|
|
Filename: "log.log",
|
|
}
|
|
// Use logrus logger
|
|
log := logger.NewLogrusLoggerWithConfig(config)
|
|
|
|
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
|
|
WithInitialPositionInStream(cfg.LATEST).
|
|
WithEnhancedFanOutConsumer(true).
|
|
WithMaxRecords(10).
|
|
WithMaxLeasesForWorker(1).
|
|
WithShardSyncIntervalMillis(5000).
|
|
WithFailoverTimeMillis(300000).
|
|
WithLogger(log)
|
|
|
|
runTest(kclConfig, false, t)
|
|
}
|
|
|
|
func TestEnhancedFanOutConsumerARN(t *testing.T) {
|
|
t.Skip("Need to provide actual consumerARN")
|
|
|
|
consumerARN := "arn:aws:kinesis:*:stream/kcl-test/consumer/fanout-poc-consumer-test:*"
|
|
// At minimal, use standard logrus logger
|
|
// log := logger.NewLogrusLogger(logrus.StandardLogger())
|
|
//
|
|
// In order to have precise control over logging. Use logger with config
|
|
config := logger.Configuration{
|
|
EnableConsole: true,
|
|
ConsoleLevel: logger.Debug,
|
|
ConsoleJSONFormat: false,
|
|
EnableFile: true,
|
|
FileLevel: logger.Info,
|
|
FileJSONFormat: true,
|
|
Filename: "log.log",
|
|
}
|
|
// Use logrus logger
|
|
log := logger.NewLogrusLoggerWithConfig(config)
|
|
|
|
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
|
|
WithInitialPositionInStream(cfg.LATEST).
|
|
WithEnhancedFanOutConsumerARN(consumerARN).
|
|
WithMaxRecords(10).
|
|
WithMaxLeasesForWorker(1).
|
|
WithShardSyncIntervalMillis(5000).
|
|
WithFailoverTimeMillis(300000).
|
|
WithLogger(log)
|
|
|
|
runTest(kclConfig, false, t)
|
|
}
|
|
|
|
func runTest(kclConfig *cfg.KinesisClientLibConfiguration, triggersig bool, t *testing.T) {
|
|
assert.Equal(t, regionName, kclConfig.RegionName)
|
|
assert.Equal(t, streamName, kclConfig.StreamName)
|
|
|
|
// configure cloudwatch as metrics system
|
|
kclConfig.WithMonitoringService(getMetricsConfig(kclConfig, metricsSystem))
|
|
|
|
// Put some data into stream.
|
|
kc := NewKinesisClient(t, regionName, kclConfig.KinesisEndpoint, kclConfig.KinesisCredentials)
|
|
// publishSomeData(t, kc)
|
|
stop := continuouslyPublishSomeData(t, kc)
|
|
defer stop()
|
|
|
|
worker := wk.NewWorker(recordProcessorFactory(t), kclConfig)
|
|
err := worker.Start()
|
|
assert.Nil(t, err)
|
|
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
// Signal processing.
|
|
go func() {
|
|
sig := <-sigs
|
|
t.Logf("Received signal %s. Exiting", sig)
|
|
worker.Shutdown()
|
|
// some other processing before exit.
|
|
//os.Exit(0)
|
|
}()
|
|
|
|
if triggersig {
|
|
t.Log("Trigger signal SIGINT")
|
|
p, _ := os.FindProcess(os.Getpid())
|
|
_ = p.Signal(os.Interrupt)
|
|
}
|
|
|
|
// wait a few seconds before shutdown processing
|
|
time.Sleep(30 * time.Second)
|
|
|
|
switch metricsSystem {
|
|
case "prometheus":
|
|
res, err := http.Get("http://localhost:8080/metrics")
|
|
if err != nil {
|
|
t.Fatalf("Error scraping Prometheus endpoint %s", err)
|
|
}
|
|
|
|
var parser expfmt.TextParser
|
|
parsed, err := parser.TextToMetricFamilies(res.Body)
|
|
_ = res.Body.Close()
|
|
if err != nil {
|
|
t.Errorf("Error reading monitoring response %s", err)
|
|
}
|
|
|
|
t.Logf("Prometheus: %+v", parsed)
|
|
}
|
|
|
|
t.Log("Calling normal shutdown at the end of application.")
|
|
worker.Shutdown()
|
|
}
|
|
|
|
// configure different metrics system
|
|
func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service string) metrics.MonitoringService {
|
|
|
|
if service == "cloudwatch" {
|
|
return cloudwatch.NewMonitoringServiceWithOptions(kclConfig.RegionName,
|
|
kclConfig.KinesisCredentials,
|
|
kclConfig.Logger,
|
|
cloudwatch.DefaultCloudwatchMetricsBufferDuration)
|
|
}
|
|
|
|
if service == "prometheus" {
|
|
return prometheus.NewMonitoringService(":8080", regionName, kclConfig.Logger)
|
|
}
|
|
|
|
return nil
|
|
}
|