diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 684d0cc..1cce247 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -29,18 +29,18 @@ package checkpoint import ( "errors" - "github.com/aws/aws-sdk-go/aws/client" - "github.com/aws/aws-sdk-go/aws/session" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/client" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" - log "github.com/sirupsen/logrus" "github.com/vmware/vmware-go-kcl/clientlibrary/config" par "github.com/vmware/vmware-go-kcl/clientlibrary/partition" + "github.com/vmware/vmware-go-kcl/logger" ) const ( @@ -53,18 +53,20 @@ const ( // DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend type DynamoCheckpoint struct { + log logger.Logger TableName string leaseTableReadCapacity int64 leaseTableWriteCapacity int64 - LeaseDuration int - svc dynamodbiface.DynamoDBAPI - kclConfig *config.KinesisClientLibConfiguration - Retries int + LeaseDuration int + svc dynamodbiface.DynamoDBAPI + kclConfig *config.KinesisClientLibConfiguration + Retries int } func NewDynamoCheckpoint(kclConfig *config.KinesisClientLibConfiguration) *DynamoCheckpoint { checkpointer := &DynamoCheckpoint{ + log: kclConfig.Logger, TableName: kclConfig.TableName, leaseTableReadCapacity: int64(kclConfig.InitialLeaseTableReadCapacity), leaseTableWriteCapacity: int64(kclConfig.InitialLeaseTableWriteCapacity), @@ -84,7 +86,7 @@ func (checkpointer *DynamoCheckpoint) WithDynamoDB(svc dynamodbiface.DynamoDBAPI // Init initialises the DynamoDB Checkpoint func (checkpointer *DynamoCheckpoint) Init() error { - log.Info("Creating DynamoDB session") + checkpointer.log.Infof("Creating DynamoDB session") s, err := session.NewSession(&aws.Config{ Region: aws.String(checkpointer.kclConfig.RegionName), @@ -95,7 +97,7 @@ func (checkpointer *DynamoCheckpoint) Init() error { if err != nil { // no need to move forward - log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err) + checkpointer.log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err) } if checkpointer.svc == nil { @@ -137,7 +139,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign return errors.New(ErrLeaseNotAquired) } - log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) + checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout" expressionAttributeValues = map[string]*dynamodb.AttributeValue{ ":id": { @@ -228,7 +230,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er if !ok { return ErrSequenceIDNotFound } - log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S) + checkpointer.log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S) shard.Mux.Lock() defer shard.Mux.Unlock() shard.Checkpoint = aws.StringValue(sequenceID.S) @@ -244,9 +246,9 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error { err := checkpointer.removeItem(shardID) if err != nil { - log.Errorf("Error in removing lease info for shard: %s, Error: %+v", shardID, err) + checkpointer.log.Errorf("Error in removing lease info for shard: %s, Error: %+v", shardID, err) } else { - log.Infof("Lease info for shard: %s has been removed.", shardID) + checkpointer.log.Infof("Lease info for shard: %s has been removed.", shardID) } return err diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 3200725..9db8d4f 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -41,6 +41,7 @@ import ( "github.com/aws/aws-sdk-go/aws" creds "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/vmware/vmware-go-kcl/logger" ) const ( @@ -256,6 +257,9 @@ type ( // Worker should skip syncing shards and leases at startup if leases are present // This is useful for optimizing deployments to large fleets working on a stable stream. SkipShardSyncAtWorkerInitializationIfLeasesExist bool + + // Logger used to log message. + Logger logger.Logger } ) diff --git a/clientlibrary/config/config_test.go b/clientlibrary/config/config_test.go index 466d6b0..6f90796 100644 --- a/clientlibrary/config/config_test.go +++ b/clientlibrary/config/config_test.go @@ -19,6 +19,7 @@ package config import ( + "github.com/vmware/vmware-go-kcl/logger" "testing" "github.com/stretchr/testify/assert" @@ -37,4 +38,8 @@ func TestConfig(t *testing.T) { assert.Equal(t, "appName", kclConfig.ApplicationName) assert.Equal(t, 500, kclConfig.FailoverTimeMillis) + + contextLogger := kclConfig.Logger.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with default logger") + contextLogger.Infof("Default logger is awesome") } diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 1419ad8..f1dc058 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -34,10 +34,12 @@ package config import ( - "github.com/aws/aws-sdk-go/aws/credentials" + "log" "time" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/vmware/vmware-go-kcl/clientlibrary/utils" + "github.com/vmware/vmware-go-kcl/logger" ) // NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. @@ -92,6 +94,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + Logger: logger.GetDefaultLogger(), } } @@ -201,3 +204,11 @@ func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueS c.MetricsMaxQueueSize = metricsMaxQueueSize return c } + +func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *KinesisClientLibConfiguration { + if logger == nil { + log.Panic("Logger cannot be null") + } + c.Logger = logger + return c +} diff --git a/clientlibrary/metrics/cloudwatch.go b/clientlibrary/metrics/cloudwatch.go index 477f127..a189656 100644 --- a/clientlibrary/metrics/cloudwatch.go +++ b/clientlibrary/metrics/cloudwatch.go @@ -28,15 +28,16 @@ package metrics import ( - "github.com/aws/aws-sdk-go/aws/credentials" "sync" "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" - log "github.com/sirupsen/logrus" + + "github.com/vmware/vmware-go-kcl/logger" ) type CloudWatchMonitoringService struct { @@ -45,6 +46,7 @@ type CloudWatchMonitoringService struct { WorkerID string Region string Credentials *credentials.Credentials + Logger logger.Logger // control how often to pusblish to CloudWatch MetricsBufferTimeMillis int @@ -72,7 +74,7 @@ func (cw *CloudWatchMonitoringService) Init() error { cfg.Credentials = cw.Credentials s, err := session.NewSession(cfg) if err != nil { - log.Errorf("Error in creating session for cloudwatch. %+v", err) + cw.Logger.Errorf("Error in creating session for cloudwatch. %+v", err) return err } cw.svc = cloudwatch.New(s) @@ -94,10 +96,10 @@ func (cw *CloudWatchMonitoringService) Start() error { } func (cw *CloudWatchMonitoringService) Shutdown() { - log.Info("Shutting down cloudwatch metrics system...") + cw.Logger.Infof("Shutting down cloudwatch metrics system...") close(*cw.stop) cw.waitGroup.Wait() - log.Info("Cloudwatch metrics system has been shutdown.") + cw.Logger.Infof("Cloudwatch metrics system has been shutdown.") } // Start daemon to flush metrics periodically @@ -106,14 +108,14 @@ func (cw *CloudWatchMonitoringService) eventloop() { for { if err := cw.flush(); err != nil { - log.Errorf("Error sending metrics to CloudWatch. %+v", err) + cw.Logger.Errorf("Error sending metrics to CloudWatch. %+v", err) } select { case <-*cw.stop: - log.Info("Shutting down monitoring system") + cw.Logger.Infof("Shutting down monitoring system") if err := cw.flush(); err != nil { - log.Errorf("Error sending metrics to CloudWatch. %+v", err) + cw.Logger.Errorf("Error sending metrics to CloudWatch. %+v", err) } return case <-time.After(time.Duration(cw.MetricsBufferTimeMillis) * time.Millisecond): @@ -237,7 +239,7 @@ func (cw *CloudWatchMonitoringService) flushShard(shard string, metric *cloudWat metric.getRecordsTime = []float64{} metric.processRecordsTime = []float64{} } else { - log.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) + cw.Logger.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) } metric.Unlock() @@ -245,7 +247,7 @@ func (cw *CloudWatchMonitoringService) flushShard(shard string, metric *cloudWat } func (cw *CloudWatchMonitoringService) flush() error { - log.Debugf("Flushing metrics data. Stream: %s, Worker: %s", cw.KinesisStream, cw.WorkerID) + cw.Logger.Debugf("Flushing metrics data. Stream: %s, Worker: %s", cw.KinesisStream, cw.WorkerID) // publish per shard metrics cw.shardMetrics.Range(func(k, v interface{}) bool { shard, metric := k.(string), v.(*cloudWatchMetrics) diff --git a/clientlibrary/metrics/interfaces.go b/clientlibrary/metrics/interfaces.go index 41ef053..c79cb61 100644 --- a/clientlibrary/metrics/interfaces.go +++ b/clientlibrary/metrics/interfaces.go @@ -29,6 +29,7 @@ package metrics import ( "fmt" + "github.com/vmware/vmware-go-kcl/logger" ) // MonitoringConfiguration allows you to configure how record processing metrics are exposed @@ -38,6 +39,7 @@ type MonitoringConfiguration struct { Prometheus PrometheusMonitoringService CloudWatch CloudWatchMonitoringService service MonitoringService + Logger logger.Logger } type MonitoringService interface { @@ -60,18 +62,25 @@ func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID st return nil } + // Config with default logger if logger is not specified. + if m.Logger == nil { + m.Logger = logger.GetDefaultLogger() + } + switch m.MonitoringService { case "prometheus": m.Prometheus.Namespace = nameSpace m.Prometheus.KinesisStream = streamName m.Prometheus.WorkerID = workerID m.Prometheus.Region = m.Region + m.Prometheus.Logger = m.Logger m.service = &m.Prometheus case "cloudwatch": m.CloudWatch.Namespace = nameSpace m.CloudWatch.KinesisStream = streamName m.CloudWatch.WorkerID = workerID m.CloudWatch.Region = m.Region + m.CloudWatch.Logger = m.Logger m.service = &m.CloudWatch default: return fmt.Errorf("Invalid monitoring service type %s", m.MonitoringService) diff --git a/clientlibrary/metrics/prometheus.go b/clientlibrary/metrics/prometheus.go index 81c08ce..3dae914 100644 --- a/clientlibrary/metrics/prometheus.go +++ b/clientlibrary/metrics/prometheus.go @@ -32,7 +32,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - log "github.com/sirupsen/logrus" + + "github.com/vmware/vmware-go-kcl/logger" ) // PrometheusMonitoringService to start Prometheus as metrics system. @@ -41,10 +42,12 @@ import ( type PrometheusMonitoringService struct { ListenAddress string - Namespace string - KinesisStream string - WorkerID string - Region string + Namespace string + KinesisStream string + WorkerID string + Region string + Logger logger.Logger + processedRecords *prometheus.CounterVec processedBytes *prometheus.CounterVec behindLatestMillis *prometheus.GaugeVec @@ -106,12 +109,12 @@ func (p *PrometheusMonitoringService) Init() error { func (p *PrometheusMonitoringService) Start() error { http.Handle("/metrics", promhttp.Handler()) go func() { - log.Infof("Starting Prometheus listener on %s", p.ListenAddress) + p.Logger.Infof("Starting Prometheus listener on %s", p.ListenAddress) err := http.ListenAndServe(p.ListenAddress, nil) if err != nil { - log.Errorf("Error starting Prometheus metrics endpoint. %+v", err) + p.Logger.Errorf("Error starting Prometheus metrics endpoint. %+v", err) } - log.Info("Stopped metrics server") + p.Logger.Infof("Stopped metrics server") }() return nil diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 87b24aa..04bae9f 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -32,8 +32,6 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/kinesis" @@ -86,6 +84,8 @@ type ShardConsumer struct { } func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, error) { + log := sc.kclConfig.Logger + // Get checkpoint of the shard from dynamoDB err := sc.checkpointer.FetchCheckpoint(shard) if err != nil && err != chk.ErrSequenceIDNotFound { @@ -128,6 +128,8 @@ func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, erro func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { defer sc.releaseLease(shard) + log := sc.kclConfig.Logger + // If the shard is child shard, need to wait until the parent finished. if err := sc.waitOnParentShard(shard); err != nil { // If parent shard has been deleted by Kinesis system already, just ignore the error. @@ -282,6 +284,7 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error { // Cleanup the internal lease cache func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) { + log := sc.kclConfig.Logger log.Infof("Release lease for shard %s", shard.ID) shard.SetLeaseOwner("") diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 2a7bd52..fac1a7d 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -32,8 +32,6 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" @@ -105,19 +103,20 @@ func (w *Worker) WithCheckpointer(checker chk.Checkpointer) *Worker { // Run starts consuming data from the stream, and pass it to the application record processors. func (w *Worker) Start() error { + log := w.kclConfig.Logger if err := w.initialize(); err != nil { log.Errorf("Failed to initialize Worker: %+v", err) return err } // Start monitoring service - log.Info("Starting monitoring service.") + log.Infof("Starting monitoring service.") if err := w.mService.Start(); err != nil { log.Errorf("Failed to start monitoring service: %+v", err) return err } - log.Info("Starting worker event loop.") + log.Infof("Starting worker event loop.") // entering event loop go w.eventLoop() return nil @@ -125,7 +124,8 @@ func (w *Worker) Start() error { // Shutdown signals worker to shutdown. Worker will try initiating shutdown of all record processors. func (w *Worker) Shutdown() { - log.Info("Worker shutdown in requested.") + log := w.kclConfig.Logger + log.Infof("Worker shutdown in requested.") if w.done { return @@ -136,11 +136,12 @@ func (w *Worker) Shutdown() { w.waitGroup.Wait() w.mService.Shutdown() - log.Info("Worker loop is complete. Exiting from worker.") + log.Infof("Worker loop is complete. Exiting from worker.") } // Publish to write some data into stream. This function is mainly used for testing purpose. func (w *Worker) Publish(streamName, partitionKey string, data []byte) error { + log := w.kclConfig.Logger _, err := w.kc.PutRecord(&kinesis.PutRecordInput{ Data: data, StreamName: aws.String(streamName), @@ -154,12 +155,13 @@ func (w *Worker) Publish(streamName, partitionKey string, data []byte) error { // initialize func (w *Worker) initialize() error { - log.Info("Worker initialization in progress...") + log := w.kclConfig.Logger + log.Infof("Worker initialization in progress...") // Create default Kinesis session if w.kc == nil { // create session for Kinesis - log.Info("Creating Kinesis session") + log.Infof("Creating Kinesis session") s, err := session.NewSession(&aws.Config{ Region: aws.String(w.regionName), @@ -173,15 +175,15 @@ func (w *Worker) initialize() error { } w.kc = kinesis.New(s) } else { - log.Info("Use custom Kinesis service.") + log.Infof("Use custom Kinesis service.") } // Create default dynamodb based checkpointer implementation if w.checkpointer == nil { - log.Info("Creating DynamoDB based checkpointer") + log.Infof("Creating DynamoDB based checkpointer") w.checkpointer = chk.NewDynamoCheckpoint(w.kclConfig) } else { - log.Info("Use custom checkpointer implementation.") + log.Infof("Use custom checkpointer implementation.") } err := w.metricsConfig.Init(w.kclConfig.ApplicationName, w.streamName, w.workerID) @@ -190,7 +192,7 @@ func (w *Worker) initialize() error { } w.mService = w.metricsConfig.GetMonitoringService() - log.Info("Initializing Checkpointer") + log.Infof("Initializing Checkpointer") if err := w.checkpointer.Init(); err != nil { log.Errorf("Failed to start Checkpointer: %+v", err) return err @@ -203,7 +205,7 @@ func (w *Worker) initialize() error { w.waitGroup = &sync.WaitGroup{} - log.Info("Initialization complete.") + log.Infof("Initialization complete.") return nil } @@ -226,6 +228,8 @@ func (w *Worker) newShardConsumer(shard *par.ShardStatus) *ShardConsumer { // eventLoop func (w *Worker) eventLoop() { + log := w.kclConfig.Logger + for { err := w.syncShard() if err != nil { @@ -271,7 +275,7 @@ func (w *Worker) eventLoop() { if err != nil { // cannot get lease on the shard if err.Error() != chk.ErrLeaseNotAquired { - log.Error(err) + log.Errorf("Cannot get lease: %+v", err) } continue } @@ -284,7 +288,9 @@ func (w *Worker) eventLoop() { w.waitGroup.Add(1) go func() { defer w.waitGroup.Done() - sc.getRecords(shard) + if err := sc.getRecords(shard); err != nil { + log.Errorf("Error in getRecords: %+v", err) + } }() // exit from for loop and not to grab more shard for now. break @@ -293,7 +299,7 @@ func (w *Worker) eventLoop() { select { case <-*w.stop: - log.Info("Shutting down...") + log.Infof("Shutting down...") return case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond): } @@ -303,6 +309,7 @@ func (w *Worker) eventLoop() { // List all ACTIVE shard and store them into shardStatus table // If shard has been removed, need to exclude it from cached shard status. func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) error { + log := w.kclConfig.Logger // The default pagination limit is 100. args := &kinesis.DescribeStreamInput{ StreamName: aws.String(w.streamName), @@ -355,6 +362,7 @@ func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) err // syncShard to sync the cached shard info with actual shard info from Kinesis func (w *Worker) syncShard() error { + log := w.kclConfig.Logger shardInfo := make(map[string]bool) err := w.getShardIDs("", shardInfo) diff --git a/go.mod b/go.mod index c18af03..9a4ec48 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/vmware/vmware-go-kcl require ( + github.com/BurntSushi/toml v0.3.1 // indirect github.com/aws/aws-sdk-go v1.19.38 github.com/google/uuid v1.1.1 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect @@ -9,7 +10,13 @@ require ( github.com/prometheus/procfs v0.0.0-20190523193104-a7aeb8df3389 // indirect github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.3.0 + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.2.0 // indirect + go.uber.org/zap v1.11.0 golang.org/x/net v0.0.0-20190522155817-f3200d17e092 // indirect golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4 // indirect golang.org/x/text v0.3.2 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) + +go 1.13 diff --git a/go.sum b/go.sum index 9a5146b..19dc05c 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -32,6 +34,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0j github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -59,6 +62,12 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4= +go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.11.0 h1:gSmpCfs+R47a4yQPAI4xJ0IPDLTRGXskm6UelqNXpqE= +go.uber.org/zap v1.11.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -78,5 +87,9 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..1712899 --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ +// https://github.com/amitrai48/logger + +package logger + +import ( + "github.com/sirupsen/logrus" +) + +// Fields Type to pass when we want to call WithFields for structured logging +type Fields map[string]interface{} + +const ( + //Debug has verbose message + Debug = "debug" + //Info is default log level + Info = "info" + //Warn is for logging messages about possible issues + Warn = "warn" + //Error is for logging errors + Error = "error" + //Fatal is for logging fatal messages. The sytem shutsdown after logging the message. + Fatal = "fatal" +) + +// Logger is the common interface for logging. +type Logger interface { + Debugf(format string, args ...interface{}) + + Infof(format string, args ...interface{}) + + Warnf(format string, args ...interface{}) + + Errorf(format string, args ...interface{}) + + Fatalf(format string, args ...interface{}) + + Panicf(format string, args ...interface{}) + + WithFields(keyValues Fields) Logger +} + +// Configuration stores the config for the logger +// For some loggers there can only be one level across writers, for such the level of Console is picked by default +type Configuration struct { + EnableConsole bool + ConsoleJSONFormat bool + ConsoleLevel string + EnableFile bool + FileJSONFormat bool + FileLevel string + + // Filename is the file to write logs to. Backup log files will be retained + // in the same directory. It uses -lumberjack.log in + // os.TempDir() if empty. + Filename string + + // MaxSize is the maximum size in megabytes of the log file before it gets + // rotated. It defaults to 100 megabytes. + MaxSizeMB int + + // MaxAge is the maximum number of days to retain old log files based on the + // timestamp encoded in their filename. Note that a day is defined as 24 + // hours and may not exactly correspond to calendar days due to daylight + // savings, leap seconds, etc. The default is 7 days. + MaxAgeDays int + + // MaxBackups is the maximum number of old log files to retain. The default + // is to retain all old log files (though MaxAge may still cause them to get + // deleted.) + MaxBackups int + + // LocalTime determines if the time used for formatting the timestamps in + // backup files is the computer's local time. The default is to use UTC + // time. + LocalTime bool +} + +// GetDefaultLogger creates a default logger. +func GetDefaultLogger() Logger { + return NewLogrusLogger(logrus.StandardLogger()) +} + +// normalizeConfig to enforce default value in configuration. +func normalizeConfig(config *Configuration) { + if config.MaxSizeMB <= 0 { + config.MaxSizeMB = 100 + } + + if config.MaxAgeDays <= 0 { + config.MaxAgeDays = 7 + } + + if config.MaxBackups < 0 { + config.MaxBackups = 0 + } +} diff --git a/logger/logger_test.go b/logger/logger_test.go new file mode 100644 index 0000000..c55a6a5 --- /dev/null +++ b/logger/logger_test.go @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ + +package logger + +import ( + "github.com/stretchr/testify/assert" + + "github.com/sirupsen/logrus" + "go.uber.org/zap" + "testing" +) + +func TestZapLoggerWithConfig(t *testing.T) { + config := Configuration{ + EnableConsole: true, + ConsoleLevel: Debug, + ConsoleJSONFormat: true, + EnableFile: false, + FileLevel: Info, + FileJSONFormat: true, + Filename: "log.log", + } + + log := NewZapLoggerWithConfig(config) + + contextLogger := log.WithFields(Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with zap") + contextLogger.Infof("Zap is awesome") +} + +func TestZapLogger(t *testing.T) { + zapLogger, err := zap.NewProduction() + assert.Nil(t, err) + + log := NewZapLogger(zapLogger.Sugar()) + + contextLogger := log.WithFields(Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with zap") + contextLogger.Infof("Zap is awesome") +} + +func TestLogrusLoggerWithConfig(t *testing.T) { + config := Configuration{ + EnableConsole: true, + ConsoleLevel: Debug, + ConsoleJSONFormat: false, + EnableFile: false, + FileLevel: Info, + FileJSONFormat: true, + } + + log := NewLogrusLoggerWithConfig(config) + + contextLogger := log.WithFields(Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with logrus") + contextLogger.Infof("Logrus is awesome") +} + +func TestLogrusLogger(t *testing.T) { + // adapts to Logger interface + log := NewLogrusLogger(logrus.StandardLogger()) + + contextLogger := log.WithFields(Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with logrus") + contextLogger.Infof("Logrus is awesome") +} diff --git a/logger/logrus.go b/logger/logrus.go new file mode 100644 index 0000000..464f691 --- /dev/null +++ b/logger/logrus.go @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ +// https://github.com/amitrai48/logger + +package logger + +import ( + "io" + "os" + + "github.com/sirupsen/logrus" + lumberjack "gopkg.in/natefinch/lumberjack.v2" +) + +type LogrusLogEntry struct { + entry *logrus.Entry +} + +type LogrusLogger struct { + logger *logrus.Logger +} + +// NewLogrusLogger adapts existing logrus logger to Logger interface. +// The call is responsible for configuring logrus logger appropriately. +func NewLogrusLogger(lLogger *logrus.Logger) Logger { + return &LogrusLogger{ + logger: lLogger, + } +} + +// NewLogrusLoggerWithConfig creates and configs Logger instance backed by +// logrus logger. +func NewLogrusLoggerWithConfig(config Configuration) Logger { + logLevel := config.ConsoleLevel + if logLevel == "" { + logLevel = config.FileLevel + } + + level, err := logrus.ParseLevel(logLevel) + if err != nil { + // fallback to InfoLevel + level = logrus.InfoLevel + } + + normalizeConfig(&config) + + stdOutHandler := os.Stdout + fileHandler := &lumberjack.Logger{ + Filename: config.Filename, + MaxSize: config.MaxSizeMB, + Compress: true, + MaxAge: config.MaxAgeDays, + MaxBackups: config.MaxBackups, + LocalTime: config.LocalTime, + } + lLogger := &logrus.Logger{ + Out: stdOutHandler, + Formatter: getFormatter(config.ConsoleJSONFormat), + Hooks: make(logrus.LevelHooks), + Level: level, + } + + if config.EnableConsole && config.EnableFile { + lLogger.SetOutput(io.MultiWriter(stdOutHandler, fileHandler)) + } else { + if config.EnableFile { + lLogger.SetOutput(fileHandler) + lLogger.SetFormatter(getFormatter(config.FileJSONFormat)) + } + } + + return &LogrusLogger{ + logger: lLogger, + } +} + +func (l *LogrusLogger) Debugf(format string, args ...interface{}) { + l.logger.Debugf(format, args...) +} + +func (l *LogrusLogger) Infof(format string, args ...interface{}) { + l.logger.Infof(format, args...) +} + +func (l *LogrusLogger) Warnf(format string, args ...interface{}) { + l.logger.Warnf(format, args...) +} + +func (l *LogrusLogger) Errorf(format string, args ...interface{}) { + l.logger.Errorf(format, args...) +} + +func (l *LogrusLogger) Fatalf(format string, args ...interface{}) { + l.logger.Fatalf(format, args...) +} + +func (l *LogrusLogger) Panicf(format string, args ...interface{}) { + l.logger.Fatalf(format, args...) +} + +func (l *LogrusLogger) WithFields(fields Fields) Logger { + return &LogrusLogEntry{ + entry: l.logger.WithFields(convertToLogrusFields(fields)), + } +} + +func (l *LogrusLogEntry) Debugf(format string, args ...interface{}) { + l.entry.Debugf(format, args...) +} + +func (l *LogrusLogEntry) Infof(format string, args ...interface{}) { + l.entry.Infof(format, args...) +} + +func (l *LogrusLogEntry) Warnf(format string, args ...interface{}) { + l.entry.Warnf(format, args...) +} + +func (l *LogrusLogEntry) Errorf(format string, args ...interface{}) { + l.entry.Errorf(format, args...) +} + +func (l *LogrusLogEntry) Fatalf(format string, args ...interface{}) { + l.entry.Fatalf(format, args...) +} + +func (l *LogrusLogEntry) Panicf(format string, args ...interface{}) { + l.entry.Fatalf(format, args...) +} + +func (l *LogrusLogEntry) WithFields(fields Fields) Logger { + return &LogrusLogEntry{ + entry: l.entry.WithFields(convertToLogrusFields(fields)), + } +} + +func getFormatter(isJSON bool) logrus.Formatter { + if isJSON { + return &logrus.JSONFormatter{} + } + return &logrus.TextFormatter{ + FullTimestamp: true, + DisableLevelTruncation: true, + } +} + +func convertToLogrusFields(fields Fields) logrus.Fields { + logrusFields := logrus.Fields{} + for index, val := range fields { + logrusFields[index] = val + } + return logrusFields +} diff --git a/logger/zap.go b/logger/zap.go new file mode 100644 index 0000000..01fdeb7 --- /dev/null +++ b/logger/zap.go @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ +// https://github.com/amitrai48/logger + +package logger + +import ( + "os" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + lumberjack "gopkg.in/natefinch/lumberjack.v2" +) + +type ZapLogger struct { + sugaredLogger *zap.SugaredLogger +} + +// NewZapLogger adapts existing sugared zap logger to Logger interface. +// The call is responsible for configuring sugard zap logger appropriately. +// +// Note: Sugar wraps the Logger to provide a more ergonomic, but slightly slower, +// API. Sugaring a Logger is quite inexpensive, so it's reasonable for a +// single application to use both Loggers and SugaredLoggers, converting +// between them on the boundaries of performance-sensitive code. +// +// Base zap logger can be convert to SugaredLogger by calling to add a wrapper: +// sugaredLogger := log.Sugar() +// +func NewZapLogger(logger *zap.SugaredLogger) Logger { + return &ZapLogger{ + sugaredLogger: logger, + } +} + +// NewZapLoggerWithConfig creates and configs Logger instance backed by +// zap Sugared logger. +func NewZapLoggerWithConfig(config Configuration) Logger { + cores := []zapcore.Core{} + + if config.EnableConsole { + level := getZapLevel(config.ConsoleLevel) + writer := zapcore.Lock(os.Stdout) + core := zapcore.NewCore(getEncoder(config.ConsoleJSONFormat), writer, level) + cores = append(cores, core) + } + + if config.EnableFile { + level := getZapLevel(config.FileLevel) + writer := zapcore.AddSync(&lumberjack.Logger{ + Filename: config.Filename, + MaxSize: config.MaxSizeMB, + Compress: true, + MaxAge: config.MaxAgeDays, + MaxBackups: config.MaxBackups, + LocalTime: config.LocalTime, + }) + core := zapcore.NewCore(getEncoder(config.FileJSONFormat), writer, level) + cores = append(cores, core) + } + + combinedCore := zapcore.NewTee(cores...) + + // AddCallerSkip skips 2 number of callers, this is important else the file that gets + // logged will always be the wrapped file. In our case zap.go + logger := zap.New(combinedCore, + zap.AddCallerSkip(2), + zap.AddCaller(), + ).Sugar() + + return &ZapLogger{ + sugaredLogger: logger, + } +} + +func (l *ZapLogger) Debugf(format string, args ...interface{}) { + l.sugaredLogger.Debugf(format, args...) +} + +func (l *ZapLogger) Infof(format string, args ...interface{}) { + l.sugaredLogger.Infof(format, args...) +} + +func (l *ZapLogger) Warnf(format string, args ...interface{}) { + l.sugaredLogger.Warnf(format, args...) +} + +func (l *ZapLogger) Errorf(format string, args ...interface{}) { + l.sugaredLogger.Errorf(format, args...) +} + +func (l *ZapLogger) Fatalf(format string, args ...interface{}) { + l.sugaredLogger.Fatalf(format, args...) +} + +func (l *ZapLogger) Panicf(format string, args ...interface{}) { + l.sugaredLogger.Fatalf(format, args...) +} + +func (l *ZapLogger) WithFields(fields Fields) Logger { + var f = make([]interface{}, 0) + for k, v := range fields { + f = append(f, k) + f = append(f, v) + } + newLogger := l.sugaredLogger.With(f...) + return &ZapLogger{newLogger} +} + +func getEncoder(isJSON bool) zapcore.Encoder { + encoderConfig := zap.NewProductionEncoderConfig() + encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + if isJSON { + return zapcore.NewJSONEncoder(encoderConfig) + } + return zapcore.NewConsoleEncoder(encoderConfig) +} + +func getZapLevel(level string) zapcore.Level { + switch level { + case Info: + return zapcore.InfoLevel + case Warn: + return zapcore.WarnLevel + case Debug: + return zapcore.DebugLevel + case Error: + return zapcore.ErrorLevel + case Fatal: + return zapcore.FatalLevel + default: + return zapcore.InfoLevel + } +} diff --git a/test/logger_test.go b/test/logger_test.go new file mode 100644 index 0000000..42e44f0 --- /dev/null +++ b/test/logger_test.go @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ + +package test + +import ( + "github.com/stretchr/testify/assert" + + "github.com/sirupsen/logrus" + "go.uber.org/zap" + "testing" + + "github.com/vmware/vmware-go-kcl/logger" +) + +func TestZapLoggerWithConfig(t *testing.T) { + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: true, + EnableFile: true, + FileLevel: logger.Info, + FileJSONFormat: true, + Filename: "log.log", + } + + log := logger.NewZapLoggerWithConfig(config) + + contextLogger := log.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with zap") + contextLogger.Infof("Zap is awesome") +} + +func TestZapLogger(t *testing.T) { + zapLogger, err := zap.NewProduction() + assert.Nil(t, err) + + log := logger.NewZapLogger(zapLogger.Sugar()) + + contextLogger := log.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with zap") + contextLogger.Infof("Zap is awesome") +} + +func TestLogrusLoggerWithConfig(t *testing.T) { + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: false, + EnableFile: true, + FileLevel: logger.Info, + FileJSONFormat: true, + Filename: "log.log", + } + + log := logger.NewLogrusLoggerWithConfig(config) + + contextLogger := log.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with logrus") + contextLogger.Infof("Logrus is awesome") +} + +func TestLogrusLogger(t *testing.T) { + // adapts to Logger interface + log := logger.NewLogrusLogger(logrus.StandardLogger()) + + contextLogger := log.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with logrus") + contextLogger.Infof("Logrus is awesome") +} diff --git a/test/worker_test.go b/test/worker_test.go index d950058..22f7ccb 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -19,9 +19,6 @@ package test import ( - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/credentials/stscreds" - "github.com/aws/aws-sdk-go/aws/session" "net/http" "os" "os/signal" @@ -30,15 +27,17 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + "github.com/aws/aws-sdk-go/aws/session" "github.com/prometheus/common/expfmt" - log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config" kc "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" "github.com/vmware/vmware-go-kcl/clientlibrary/metrics" "github.com/vmware/vmware-go-kcl/clientlibrary/utils" wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker" + "github.com/vmware/vmware-go-kcl/logger" ) const ( @@ -53,6 +52,22 @@ const metricsSystem = "cloudwatch" var shardID string func TestWorker(t *testing.T) { + // At miminal. use standard logrus logger + // log := logger.NewLogrusLogger(logrus.StandardLogger()) + // + // In order to have precise control over logging. Use logger with config + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: false, + EnableFile: true, + FileLevel: logger.Info, + FileJSONFormat: true, + Filename: "log.log", + } + // Use logrus logger + log := logger.NewLogrusLoggerWithConfig(config) + kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). @@ -60,12 +75,31 @@ func TestWorker(t *testing.T) { WithShardSyncIntervalMillis(5000). WithFailoverTimeMillis(300000). WithMetricsBufferTimeMillis(10000). - WithMetricsMaxQueueSize(20) + WithMetricsMaxQueueSize(20). + WithLogger(log) runTest(kclConfig, false, t) } func TestWorkerWithSigInt(t *testing.T) { + // At miminal. use standard zap logger + //zapLogger, err := zap.NewProduction() + //assert.Nil(t, err) + //log := logger.NewZapLogger(zapLogger.Sugar()) + // + // In order to have precise control over logging. Use logger with config. + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: true, + EnableFile: true, + FileLevel: logger.Info, + FileJSONFormat: true, + Filename: "log.log", + } + // use zap logger + log := logger.NewZapLoggerWithConfig(config) + kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). @@ -73,7 +107,8 @@ func TestWorkerWithSigInt(t *testing.T) { WithShardSyncIntervalMillis(5000). WithFailoverTimeMillis(300000). WithMetricsBufferTimeMillis(10000). - WithMetricsMaxQueueSize(20) + WithMetricsMaxQueueSize(20). + WithLogger(log) runTest(kclConfig, true, t) } @@ -120,9 +155,6 @@ func TestWorkerAssumeRole(t *testing.T) { } func runTest(kclConfig *cfg.KinesisClientLibConfiguration, triggersig bool, t *testing.T) { - log.SetOutput(os.Stdout) - log.SetLevel(log.DebugLevel) - assert.Equal(t, regionName, kclConfig.RegionName) assert.Equal(t, streamName, kclConfig.StreamName) @@ -192,6 +224,7 @@ func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service stri return &metrics.MonitoringConfiguration{ MonitoringService: "cloudwatch", Region: regionName, + Logger: kclConfig.Logger, CloudWatch: metrics.CloudWatchMonitoringService{ Credentials: kclConfig.CloudWatchCredentials, // Those value should come from kclConfig @@ -205,6 +238,7 @@ func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service stri return &metrics.MonitoringConfiguration{ MonitoringService: "prometheus", Region: regionName, + Logger: kclConfig.Logger, Prometheus: metrics.PrometheusMonitoringService{ ListenAddress: ":8080", },