From c634c75ebcb1ff2153c007615b299f5804b72190 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Sat, 16 Mar 2019 08:11:09 -0500 Subject: [PATCH] Add credential configuration for resources (#14) Add credentials for Kinesis, DynamoDB and Cloudwatch. See the worker_test.go to see how to use it. Signed-off-by: Tao Jiang --- clientlibrary/config/config.go | 10 +++++ clientlibrary/config/kcl-config.go | 17 ++++++++ clientlibrary/metrics/cloudwatch.go | 10 ++++- clientlibrary/worker/worker.go | 28 ++++++++++--- clientlibrary/worker/worker_test.go | 61 ++++++++++++++++++++++++++--- 5 files changed, 114 insertions(+), 12 deletions(-) diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index f1f3090..3200725 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -40,6 +40,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + creds "github.com/aws/aws-sdk-go/aws/credentials" ) const ( @@ -166,6 +167,15 @@ type ( // If this is empty, the default generated endpoint will be used. KinesisEndpoint string + // KinesisCredentials is used to access Kinesis + KinesisCredentials *creds.Credentials + + // DynamoDBCredentials is used to access DynamoDB + DynamoDBCredentials *creds.Credentials + + // CloudWatchCredentials is used to access CloudWatch + CloudWatchCredentials *creds.Credentials + // TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName TableName string diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 4d208cf..1419ad8 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -34,6 +34,7 @@ package config import ( + "github.com/aws/aws-sdk-go/aws/credentials" "time" "github.com/vmware/vmware-go-kcl/clientlibrary/utils" @@ -41,6 +42,19 @@ import ( // NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID string) *KinesisClientLibConfiguration { + return NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID, + nil, nil, nil) +} + +// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. +func NewKinesisClientLibConfigWithCredential(applicationName, streamName, regionName, workerID string, + creds *credentials.Credentials) *KinesisClientLibConfiguration { + return NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID, creds, creds, creds) +} + +// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. +func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID string, + kiniesisCreds, dynamodbCreds, cloudwatchCreds *credentials.Credentials) *KinesisClientLibConfiguration { checkIsValueNotEmpty("ApplicationName", applicationName) checkIsValueNotEmpty("StreamName", streamName) checkIsValueNotEmpty("RegionName", regionName) @@ -52,6 +66,9 @@ func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID // populate the KCL configuration with default values return &KinesisClientLibConfiguration{ ApplicationName: applicationName, + KinesisCredentials: kiniesisCreds, + DynamoDBCredentials: dynamodbCreds, + CloudWatchCredentials: cloudwatchCreds, TableName: applicationName, StreamName: streamName, RegionName: regionName, diff --git a/clientlibrary/metrics/cloudwatch.go b/clientlibrary/metrics/cloudwatch.go index 1a157f4..477f127 100644 --- a/clientlibrary/metrics/cloudwatch.go +++ b/clientlibrary/metrics/cloudwatch.go @@ -28,6 +28,7 @@ package metrics import ( + "github.com/aws/aws-sdk-go/aws/credentials" "sync" "time" @@ -43,6 +44,7 @@ type CloudWatchMonitoringService struct { KinesisStream string WorkerID string Region string + Credentials *credentials.Credentials // control how often to pusblish to CloudWatch MetricsBufferTimeMillis int @@ -66,7 +68,13 @@ type cloudWatchMetrics struct { } func (cw *CloudWatchMonitoringService) Init() error { - s := session.New(&aws.Config{Region: aws.String(cw.Region)}) + cfg := &aws.Config{Region: aws.String(cw.Region)} + cfg.Credentials = cw.Credentials + s, err := session.NewSession(cfg) + if err != nil { + log.Errorf("Error in creating session for cloudwatch. %+v", err) + return err + } cw.svc = cloudwatch.New(s) cw.shardMetrics = new(sync.Map) diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index c295f86..33551c3 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -113,21 +113,37 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli // create session for Kinesis log.Info("Creating Kinesis session") - s := session.New(&aws.Config{ - Region: aws.String(w.regionName), - Endpoint: &kclConfig.KinesisEndpoint, + + s, err := session.NewSession(&aws.Config{ + Region: aws.String(w.regionName), + Endpoint: &kclConfig.KinesisEndpoint, + Credentials: kclConfig.KinesisCredentials, }) + + if err != nil { + // no need to move forward + log.Fatalf("Failed in getting Kinesis session for creating Worker: %+v", err) + } w.kc = kinesis.New(s) log.Info("Creating DynamoDB session") - s = session.New(&aws.Config{ - Region: aws.String(w.regionName), - Endpoint: &kclConfig.DynamoDBEndpoint, + + s, err = session.NewSession(&aws.Config{ + Region: aws.String(w.regionName), + Endpoint: &kclConfig.DynamoDBEndpoint, + Credentials: kclConfig.DynamoDBCredentials, }) + + if err != nil { + // no need to move forward + log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err) + } + w.dynamo = dynamodb.New(s) w.checkpointer = NewDynamoCheckpoint(w.dynamo, kclConfig) if w.metricsConfig == nil { + // "" means noop monitor service. i.e. not emitting any metrics. w.metricsConfig = &metrics.MonitoringConfiguration{MonitoringService: ""} } return w diff --git a/clientlibrary/worker/worker_test.go b/clientlibrary/worker/worker_test.go index 4a9598d..013561f 100644 --- a/clientlibrary/worker/worker_test.go +++ b/clientlibrary/worker/worker_test.go @@ -19,6 +19,9 @@ package worker import ( + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + "github.com/aws/aws-sdk-go/aws/session" "net/http" "os" "testing" @@ -50,8 +53,55 @@ func TestWorker(t *testing.T) { WithMaxRecords(10). WithMaxLeasesForWorker(1). WithShardSyncIntervalMillis(5000). - WithFailoverTimeMillis(300000) + WithFailoverTimeMillis(300000). + WithMetricsBufferTimeMillis(10000). + WithMetricsMaxQueueSize(20) + runTest(kclConfig, t) +} + +func TestWorkerStatic(t *testing.T) { + t.Skip("Need to provide actual credentials") + + creds := credentials.NewStaticCredentials("AccessKeyId", "SecretAccessKey", "") + + kclConfig := cfg.NewKinesisClientLibConfigWithCredential("appName", streamName, regionName, workerID, creds). + WithInitialPositionInStream(cfg.LATEST). + WithMaxRecords(10). + WithMaxLeasesForWorker(1). + WithShardSyncIntervalMillis(5000). + WithFailoverTimeMillis(300000). + WithMetricsBufferTimeMillis(10000). + WithMetricsMaxQueueSize(20) + + runTest(kclConfig, t) +} + +func TestWorkerAssumeRole(t *testing.T) { + t.Skip("Need to provide actual roleARN") + + // Initial credentials loaded from SDK's default credential chain. Such as + // the environment, shared credentials (~/.aws/credentials), or EC2 Instance + // Role. These credentials will be used to to make the STS Assume Role API. + sess := session.Must(session.NewSession()) + + // Create the credentials from AssumeRoleProvider to assume the role + // referenced by the "myRoleARN" ARN. + creds := stscreds.NewCredentials(sess, "arn:aws:iam::*:role/kcl-test-publisher") + + kclConfig := cfg.NewKinesisClientLibConfigWithCredential("appName", streamName, regionName, workerID, creds). + WithInitialPositionInStream(cfg.LATEST). + WithMaxRecords(10). + WithMaxLeasesForWorker(1). + WithShardSyncIntervalMillis(5000). + WithFailoverTimeMillis(300000). + WithMetricsBufferTimeMillis(10000). + WithMetricsMaxQueueSize(20) + + runTest(kclConfig, t) +} + +func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) { log.SetOutput(os.Stdout) log.SetLevel(log.DebugLevel) @@ -59,7 +109,7 @@ func TestWorker(t *testing.T) { assert.Equal(t, streamName, kclConfig.StreamName) // configure cloudwatch as metrics system - metricsConfig := getMetricsConfig(metricsSystem) + metricsConfig := getMetricsConfig(kclConfig, metricsSystem) worker := NewWorker(recordProcessorFactory(t), kclConfig, metricsConfig) assert.Equal(t, regionName, worker.regionName) @@ -100,15 +150,16 @@ func TestWorker(t *testing.T) { } // configure different metrics system -func getMetricsConfig(service string) *metrics.MonitoringConfiguration { +func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service string) *metrics.MonitoringConfiguration { if service == "cloudwatch" { return &metrics.MonitoringConfiguration{ MonitoringService: "cloudwatch", Region: regionName, CloudWatch: metrics.CloudWatchMonitoringService{ + Credentials: kclConfig.CloudWatchCredentials, // Those value should come from kclConfig - MetricsBufferTimeMillis: 10000, - MetricsMaxQueueSize: 20, + MetricsBufferTimeMillis: kclConfig.MetricsBufferTimeMillis, + MetricsMaxQueueSize: kclConfig.MetricsMaxQueueSize, }, } }