Add credential configuration for resources (#14)

Add credentials for Kinesis, DynamoDB and Cloudwatch. See the worker_test.go
to see how to use it.

Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
Tao Jiang 2019-03-16 08:11:09 -05:00
parent 5140058e8b
commit c634c75ebc
5 changed files with 114 additions and 12 deletions

View file

@ -40,6 +40,7 @@ import (
"time"
"github.com/aws/aws-sdk-go/aws"
creds "github.com/aws/aws-sdk-go/aws/credentials"
)
const (
@ -166,6 +167,15 @@ type (
// If this is empty, the default generated endpoint will be used.
KinesisEndpoint string
// KinesisCredentials is used to access Kinesis
KinesisCredentials *creds.Credentials
// DynamoDBCredentials is used to access DynamoDB
DynamoDBCredentials *creds.Credentials
// CloudWatchCredentials is used to access CloudWatch
CloudWatchCredentials *creds.Credentials
// TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName
TableName string

View file

@ -34,6 +34,7 @@
package config
import (
"github.com/aws/aws-sdk-go/aws/credentials"
"time"
"github.com/vmware/vmware-go-kcl/clientlibrary/utils"
@ -41,6 +42,19 @@ import (
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID string) *KinesisClientLibConfiguration {
return NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID,
nil, nil, nil)
}
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
func NewKinesisClientLibConfigWithCredential(applicationName, streamName, regionName, workerID string,
creds *credentials.Credentials) *KinesisClientLibConfiguration {
return NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID, creds, creds, creds)
}
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID string,
kiniesisCreds, dynamodbCreds, cloudwatchCreds *credentials.Credentials) *KinesisClientLibConfiguration {
checkIsValueNotEmpty("ApplicationName", applicationName)
checkIsValueNotEmpty("StreamName", streamName)
checkIsValueNotEmpty("RegionName", regionName)
@ -52,6 +66,9 @@ func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID
// populate the KCL configuration with default values
return &KinesisClientLibConfiguration{
ApplicationName: applicationName,
KinesisCredentials: kiniesisCreds,
DynamoDBCredentials: dynamodbCreds,
CloudWatchCredentials: cloudwatchCreds,
TableName: applicationName,
StreamName: streamName,
RegionName: regionName,

View file

@ -28,6 +28,7 @@
package metrics
import (
"github.com/aws/aws-sdk-go/aws/credentials"
"sync"
"time"
@ -43,6 +44,7 @@ type CloudWatchMonitoringService struct {
KinesisStream string
WorkerID string
Region string
Credentials *credentials.Credentials
// control how often to pusblish to CloudWatch
MetricsBufferTimeMillis int
@ -66,7 +68,13 @@ type cloudWatchMetrics struct {
}
func (cw *CloudWatchMonitoringService) Init() error {
s := session.New(&aws.Config{Region: aws.String(cw.Region)})
cfg := &aws.Config{Region: aws.String(cw.Region)}
cfg.Credentials = cw.Credentials
s, err := session.NewSession(cfg)
if err != nil {
log.Errorf("Error in creating session for cloudwatch. %+v", err)
return err
}
cw.svc = cloudwatch.New(s)
cw.shardMetrics = new(sync.Map)

View file

@ -113,21 +113,37 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli
// create session for Kinesis
log.Info("Creating Kinesis session")
s := session.New(&aws.Config{
Region: aws.String(w.regionName),
Endpoint: &kclConfig.KinesisEndpoint,
s, err := session.NewSession(&aws.Config{
Region: aws.String(w.regionName),
Endpoint: &kclConfig.KinesisEndpoint,
Credentials: kclConfig.KinesisCredentials,
})
if err != nil {
// no need to move forward
log.Fatalf("Failed in getting Kinesis session for creating Worker: %+v", err)
}
w.kc = kinesis.New(s)
log.Info("Creating DynamoDB session")
s = session.New(&aws.Config{
Region: aws.String(w.regionName),
Endpoint: &kclConfig.DynamoDBEndpoint,
s, err = session.NewSession(&aws.Config{
Region: aws.String(w.regionName),
Endpoint: &kclConfig.DynamoDBEndpoint,
Credentials: kclConfig.DynamoDBCredentials,
})
if err != nil {
// no need to move forward
log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err)
}
w.dynamo = dynamodb.New(s)
w.checkpointer = NewDynamoCheckpoint(w.dynamo, kclConfig)
if w.metricsConfig == nil {
// "" means noop monitor service. i.e. not emitting any metrics.
w.metricsConfig = &metrics.MonitoringConfiguration{MonitoringService: ""}
}
return w

View file

@ -19,6 +19,9 @@
package worker
import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"net/http"
"os"
"testing"
@ -50,8 +53,55 @@ func TestWorker(t *testing.T) {
WithMaxRecords(10).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000)
WithFailoverTimeMillis(300000).
WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20)
runTest(kclConfig, t)
}
func TestWorkerStatic(t *testing.T) {
t.Skip("Need to provide actual credentials")
creds := credentials.NewStaticCredentials("AccessKeyId", "SecretAccessKey", "")
kclConfig := cfg.NewKinesisClientLibConfigWithCredential("appName", streamName, regionName, workerID, creds).
WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000).
WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20)
runTest(kclConfig, t)
}
func TestWorkerAssumeRole(t *testing.T) {
t.Skip("Need to provide actual roleARN")
// Initial credentials loaded from SDK's default credential chain. Such as
// the environment, shared credentials (~/.aws/credentials), or EC2 Instance
// Role. These credentials will be used to to make the STS Assume Role API.
sess := session.Must(session.NewSession())
// Create the credentials from AssumeRoleProvider to assume the role
// referenced by the "myRoleARN" ARN.
creds := stscreds.NewCredentials(sess, "arn:aws:iam::*:role/kcl-test-publisher")
kclConfig := cfg.NewKinesisClientLibConfigWithCredential("appName", streamName, regionName, workerID, creds).
WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000).
WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20)
runTest(kclConfig, t)
}
func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) {
log.SetOutput(os.Stdout)
log.SetLevel(log.DebugLevel)
@ -59,7 +109,7 @@ func TestWorker(t *testing.T) {
assert.Equal(t, streamName, kclConfig.StreamName)
// configure cloudwatch as metrics system
metricsConfig := getMetricsConfig(metricsSystem)
metricsConfig := getMetricsConfig(kclConfig, metricsSystem)
worker := NewWorker(recordProcessorFactory(t), kclConfig, metricsConfig)
assert.Equal(t, regionName, worker.regionName)
@ -100,15 +150,16 @@ func TestWorker(t *testing.T) {
}
// configure different metrics system
func getMetricsConfig(service string) *metrics.MonitoringConfiguration {
func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service string) *metrics.MonitoringConfiguration {
if service == "cloudwatch" {
return &metrics.MonitoringConfiguration{
MonitoringService: "cloudwatch",
Region: regionName,
CloudWatch: metrics.CloudWatchMonitoringService{
Credentials: kclConfig.CloudWatchCredentials,
// Those value should come from kclConfig
MetricsBufferTimeMillis: 10000,
MetricsMaxQueueSize: 20,
MetricsBufferTimeMillis: kclConfig.MetricsBufferTimeMillis,
MetricsMaxQueueSize: kclConfig.MetricsMaxQueueSize,
},
}
}