diff --git a/clientlibrary/worker/worker-custom.go b/clientlibrary/worker/worker-custom.go deleted file mode 100644 index 580a416..0000000 --- a/clientlibrary/worker/worker-custom.go +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2019 VMware, Inc. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and - * associated documentation files (the "Software"), to deal in the Software without restriction, including - * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is furnished to do - * so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial - * portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT - * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. - * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE - * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ -package worker - -import ( - log "github.com/sirupsen/logrus" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/kinesis" - chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint" - "github.com/vmware/vmware-go-kcl/clientlibrary/config" - kcl "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" - "github.com/vmware/vmware-go-kcl/clientlibrary/metrics" -) - -// NewCustomWorker constructs a Worker instance for processing Kinesis stream data by directly inject custom cjheckpointer. -func NewCustomWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisClientLibConfiguration, - checkpointer chk.Checkpointer, metricsConfig *metrics.MonitoringConfiguration) *Worker { - w := &Worker{ - streamName: kclConfig.StreamName, - regionName: kclConfig.RegionName, - workerID: kclConfig.WorkerID, - processorFactory: factory, - kclConfig: kclConfig, - checkpointer: checkpointer, - metricsConfig: metricsConfig, - } - - // create session for Kinesis - log.Info("Creating Kinesis session") - - s, err := session.NewSession(&aws.Config{ - Region: aws.String(w.regionName), - Endpoint: &kclConfig.KinesisEndpoint, - Credentials: kclConfig.KinesisCredentials, - }) - - if err != nil { - // no need to move forward - log.Fatalf("Failed in getting Kinesis session for creating Worker: %+v", err) - } - w.kc = kinesis.New(s) - - if w.metricsConfig == nil { - // "" means noop monitor service. i.e. not emitting any metrics. - w.metricsConfig = &metrics.MonitoringConfiguration{MonitoringService: ""} - } - return w -} diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 66066d3..5bed3fa 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -83,24 +83,6 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli done: false, } - // create session for Kinesis - log.Info("Creating Kinesis session") - - s, err := session.NewSession(&aws.Config{ - Region: aws.String(w.regionName), - Endpoint: &kclConfig.KinesisEndpoint, - Credentials: kclConfig.KinesisCredentials, - }) - - if err != nil { - // no need to move forward - log.Fatalf("Failed in getting Kinesis session for creating Worker: %+v", err) - } - w.kc = kinesis.New(s) - - log.Info("Creating DynamoDB based checkpointer") - w.checkpointer = chk.NewDynamoCheckpoint(kclConfig) - if w.metricsConfig == nil { // "" means noop monitor service. i.e. not emitting any metrics. w.metricsConfig = &metrics.MonitoringConfiguration{MonitoringService: ""} @@ -108,10 +90,23 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli return w } +// WithKinesis is used to provide Kinesis service for either custom implementation or unit testing. +func (w *Worker) WithKinesis(svc kinesisiface.KinesisAPI) *Worker { + w.kc = svc + return w +} + +// WithCheckpointer is used to provide a custom checkpointer service for non-dynamodb implementation +// or unit testing. +func (w *Worker) WithCheckpointer(checker chk.Checkpointer) *Worker { + w.checkpointer = checker + return w +} + // Run starts consuming data from the stream, and pass it to the application record processors. func (w *Worker) Start() error { if err := w.initialize(); err != nil { - log.Errorf("Failed to start Worker: %+v", err) + log.Errorf("Failed to initialize Worker: %+v", err) return err } @@ -161,6 +156,34 @@ func (w *Worker) Publish(streamName, partitionKey string, data []byte) error { func (w *Worker) initialize() error { log.Info("Worker initialization in progress...") + // Create default Kinesis session + if w.kc == nil { + // create session for Kinesis + log.Info("Creating Kinesis session") + + s, err := session.NewSession(&aws.Config{ + Region: aws.String(w.regionName), + Endpoint: &w.kclConfig.KinesisEndpoint, + Credentials: w.kclConfig.KinesisCredentials, + }) + + if err != nil { + // no need to move forward + log.Fatalf("Failed in getting Kinesis session for creating Worker: %+v", err) + } + w.kc = kinesis.New(s) + } else { + log.Info("Use custom Kinesis service.") + } + + // Create default dynamodb based checkpointer implementation + if w.checkpointer == nil { + log.Info("Creating DynamoDB based checkpointer") + w.checkpointer = chk.NewDynamoCheckpoint(w.kclConfig) + } else { + log.Info("Use custom checkpointer implementation.") + } + err := w.metricsConfig.Init(w.kclConfig.ApplicationName, w.streamName, w.workerID) if err != nil { log.Errorf("Failed to start monitoring service: %+v", err) diff --git a/test/worker_custom_test.go b/test/worker_custom_test.go index 55111ac..6b3beb2 100644 --- a/test/worker_custom_test.go +++ b/test/worker_custom_test.go @@ -23,6 +23,10 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -32,7 +36,7 @@ import ( wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker" ) -func TestCustomWorker(t *testing.T) { +func TestWorkerInjectCheckpointer(t *testing.T) { kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). @@ -51,9 +55,12 @@ func TestCustomWorker(t *testing.T) { // configure cloudwatch as metrics system metricsConfig := getMetricsConfig(kclConfig, metricsSystem) + // custom checkpointer or a mock checkpointer. checkpointer := chk.NewDynamoCheckpoint(kclConfig) - worker := wk.NewCustomWorker(recordProcessorFactory(t), kclConfig, checkpointer, metricsConfig) + // Inject a custom checkpointer into the worker. + worker := wk.NewWorker(recordProcessorFactory(t), kclConfig, metricsConfig). + WithCheckpointer(checkpointer) err := worker.Start() assert.Nil(t, err) @@ -71,3 +78,101 @@ func TestCustomWorker(t *testing.T) { time.Sleep(10 * time.Second) worker.Shutdown() } + +func TestWorkerInjectKinesis(t *testing.T) { + kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). + WithInitialPositionInStream(cfg.LATEST). + WithMaxRecords(10). + WithMaxLeasesForWorker(1). + WithShardSyncIntervalMillis(5000). + WithFailoverTimeMillis(300000). + WithMetricsBufferTimeMillis(10000). + WithMetricsMaxQueueSize(20) + + log.SetOutput(os.Stdout) + log.SetLevel(log.DebugLevel) + + assert.Equal(t, regionName, kclConfig.RegionName) + assert.Equal(t, streamName, kclConfig.StreamName) + + // configure cloudwatch as metrics system + metricsConfig := getMetricsConfig(kclConfig, metricsSystem) + + // create custom Kinesis + s, err := session.NewSession(&aws.Config{ + Region: aws.String(regionName), + }) + assert.Nil(t, err) + kc := kinesis.New(s) + + // Inject a custom checkpointer into the worker. + worker := wk.NewWorker(recordProcessorFactory(t), kclConfig, metricsConfig). + WithKinesis(kc) + + err = worker.Start() + assert.Nil(t, err) + + // Put some data into stream. + for i := 0; i < 100; i++ { + // Use random string as partition key to ensure even distribution across shards + err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr)) + if err != nil { + t.Errorf("Errorin Publish. %+v", err) + } + } + + // wait a few seconds before shutdown processing + time.Sleep(10 * time.Second) + worker.Shutdown() +} + +func TestWorkerInjectKinesisAndCheckpointer(t *testing.T) { + kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). + WithInitialPositionInStream(cfg.LATEST). + WithMaxRecords(10). + WithMaxLeasesForWorker(1). + WithShardSyncIntervalMillis(5000). + WithFailoverTimeMillis(300000). + WithMetricsBufferTimeMillis(10000). + WithMetricsMaxQueueSize(20) + + log.SetOutput(os.Stdout) + log.SetLevel(log.DebugLevel) + + assert.Equal(t, regionName, kclConfig.RegionName) + assert.Equal(t, streamName, kclConfig.StreamName) + + // configure cloudwatch as metrics system + metricsConfig := getMetricsConfig(kclConfig, metricsSystem) + + // create custom Kinesis + s, err := session.NewSession(&aws.Config{ + Region: aws.String(regionName), + }) + assert.Nil(t, err) + kc := kinesis.New(s) + + // custom checkpointer or a mock checkpointer. + checkpointer := chk.NewDynamoCheckpoint(kclConfig) + + // Inject both custom checkpointer and kinesis into the worker. + worker := wk.NewWorker(recordProcessorFactory(t), kclConfig, metricsConfig). + WithKinesis(kc). + WithCheckpointer(checkpointer) + + err = worker.Start() + assert.Nil(t, err) + + // Put some data into stream. + for i := 0; i < 100; i++ { + // Use random string as partition key to ensure even distribution across shards + err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr)) + if err != nil { + t.Errorf("Errorin Publish. %+v", err) + } + } + + // wait a few seconds before shutdown processing + time.Sleep(10 * time.Second) + worker.Shutdown() +}