From 0d91fbd443a1d2d0edac77514db2bfbce9a303ed Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Mon, 28 Oct 2019 07:08:18 -0500 Subject: [PATCH] Add generic logger support (#43) * Add generic logger support The current KCL has tight coupling with logrus and it causes issue for customer to use different logging system such as zap log. The issue has been opened via: https://github.com/vmware/vmware-go-kcl/issues/27 This change is to created a logger interface be able to abstract above logrus and zap log. It makes easy to add support for other logging system in the fugure. The work is based on: https://www.mountedthoughts.com/golang-logger-interface/ Some updates are made in order to make logging system easily injectable and add more unit tests. Tested against real kinesis and dyamodb as well. Signed-off-by: Tao Jiang * Add lumberjack configuration options to have fine grained control Update the file log configuratio by adding most of luberjack configuration to avoid hardcode default value. Let user to specify the value because log retention and rotation are very important for prod environment. Signed-off-by: Tao Jiang --- .../checkpoint/dynamodb-checkpointer.go | 28 +-- clientlibrary/config/config.go | 4 + clientlibrary/config/config_test.go | 5 + clientlibrary/config/kcl-config.go | 13 +- clientlibrary/metrics/cloudwatch.go | 22 +-- clientlibrary/metrics/interfaces.go | 9 + clientlibrary/metrics/prometheus.go | 19 +- clientlibrary/worker/shard-consumer.go | 7 +- clientlibrary/worker/worker.go | 40 +++-- go.mod | 7 + go.sum | 13 ++ logger/logger.go | 115 ++++++++++++ logger/logger_test.go | 84 +++++++++ logger/logrus.go | 170 ++++++++++++++++++ logger/zap.go | 151 ++++++++++++++++ test/logger_test.go | 87 +++++++++ test/worker_test.go | 54 ++++-- 17 files changed, 768 insertions(+), 60 deletions(-) create mode 100644 logger/logger.go create mode 100644 logger/logger_test.go create mode 100644 logger/logrus.go create mode 100644 logger/zap.go create mode 100644 test/logger_test.go diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 684d0cc..1cce247 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -29,18 +29,18 @@ package checkpoint import ( "errors" - "github.com/aws/aws-sdk-go/aws/client" - "github.com/aws/aws-sdk-go/aws/session" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/client" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" - log "github.com/sirupsen/logrus" "github.com/vmware/vmware-go-kcl/clientlibrary/config" par "github.com/vmware/vmware-go-kcl/clientlibrary/partition" + "github.com/vmware/vmware-go-kcl/logger" ) const ( @@ -53,18 +53,20 @@ const ( // DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend type DynamoCheckpoint struct { + log logger.Logger TableName string leaseTableReadCapacity int64 leaseTableWriteCapacity int64 - LeaseDuration int - svc dynamodbiface.DynamoDBAPI - kclConfig *config.KinesisClientLibConfiguration - Retries int + LeaseDuration int + svc dynamodbiface.DynamoDBAPI + kclConfig *config.KinesisClientLibConfiguration + Retries int } func NewDynamoCheckpoint(kclConfig *config.KinesisClientLibConfiguration) *DynamoCheckpoint { checkpointer := &DynamoCheckpoint{ + log: kclConfig.Logger, TableName: kclConfig.TableName, leaseTableReadCapacity: int64(kclConfig.InitialLeaseTableReadCapacity), leaseTableWriteCapacity: int64(kclConfig.InitialLeaseTableWriteCapacity), @@ -84,7 +86,7 @@ func (checkpointer *DynamoCheckpoint) WithDynamoDB(svc dynamodbiface.DynamoDBAPI // Init initialises the DynamoDB Checkpoint func (checkpointer *DynamoCheckpoint) Init() error { - log.Info("Creating DynamoDB session") + checkpointer.log.Infof("Creating DynamoDB session") s, err := session.NewSession(&aws.Config{ Region: aws.String(checkpointer.kclConfig.RegionName), @@ -95,7 +97,7 @@ func (checkpointer *DynamoCheckpoint) Init() error { if err != nil { // no need to move forward - log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err) + checkpointer.log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err) } if checkpointer.svc == nil { @@ -137,7 +139,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign return errors.New(ErrLeaseNotAquired) } - log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) + checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout" expressionAttributeValues = map[string]*dynamodb.AttributeValue{ ":id": { @@ -228,7 +230,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er if !ok { return ErrSequenceIDNotFound } - log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S) + checkpointer.log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S) shard.Mux.Lock() defer shard.Mux.Unlock() shard.Checkpoint = aws.StringValue(sequenceID.S) @@ -244,9 +246,9 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error { err := checkpointer.removeItem(shardID) if err != nil { - log.Errorf("Error in removing lease info for shard: %s, Error: %+v", shardID, err) + checkpointer.log.Errorf("Error in removing lease info for shard: %s, Error: %+v", shardID, err) } else { - log.Infof("Lease info for shard: %s has been removed.", shardID) + checkpointer.log.Infof("Lease info for shard: %s has been removed.", shardID) } return err diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 3200725..9db8d4f 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -41,6 +41,7 @@ import ( "github.com/aws/aws-sdk-go/aws" creds "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/vmware/vmware-go-kcl/logger" ) const ( @@ -256,6 +257,9 @@ type ( // Worker should skip syncing shards and leases at startup if leases are present // This is useful for optimizing deployments to large fleets working on a stable stream. SkipShardSyncAtWorkerInitializationIfLeasesExist bool + + // Logger used to log message. + Logger logger.Logger } ) diff --git a/clientlibrary/config/config_test.go b/clientlibrary/config/config_test.go index 466d6b0..6f90796 100644 --- a/clientlibrary/config/config_test.go +++ b/clientlibrary/config/config_test.go @@ -19,6 +19,7 @@ package config import ( + "github.com/vmware/vmware-go-kcl/logger" "testing" "github.com/stretchr/testify/assert" @@ -37,4 +38,8 @@ func TestConfig(t *testing.T) { assert.Equal(t, "appName", kclConfig.ApplicationName) assert.Equal(t, 500, kclConfig.FailoverTimeMillis) + + contextLogger := kclConfig.Logger.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with default logger") + contextLogger.Infof("Default logger is awesome") } diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 1419ad8..f1dc058 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -34,10 +34,12 @@ package config import ( - "github.com/aws/aws-sdk-go/aws/credentials" + "log" "time" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/vmware/vmware-go-kcl/clientlibrary/utils" + "github.com/vmware/vmware-go-kcl/logger" ) // NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. @@ -92,6 +94,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + Logger: logger.GetDefaultLogger(), } } @@ -201,3 +204,11 @@ func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueS c.MetricsMaxQueueSize = metricsMaxQueueSize return c } + +func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *KinesisClientLibConfiguration { + if logger == nil { + log.Panic("Logger cannot be null") + } + c.Logger = logger + return c +} diff --git a/clientlibrary/metrics/cloudwatch.go b/clientlibrary/metrics/cloudwatch.go index 477f127..a189656 100644 --- a/clientlibrary/metrics/cloudwatch.go +++ b/clientlibrary/metrics/cloudwatch.go @@ -28,15 +28,16 @@ package metrics import ( - "github.com/aws/aws-sdk-go/aws/credentials" "sync" "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" - log "github.com/sirupsen/logrus" + + "github.com/vmware/vmware-go-kcl/logger" ) type CloudWatchMonitoringService struct { @@ -45,6 +46,7 @@ type CloudWatchMonitoringService struct { WorkerID string Region string Credentials *credentials.Credentials + Logger logger.Logger // control how often to pusblish to CloudWatch MetricsBufferTimeMillis int @@ -72,7 +74,7 @@ func (cw *CloudWatchMonitoringService) Init() error { cfg.Credentials = cw.Credentials s, err := session.NewSession(cfg) if err != nil { - log.Errorf("Error in creating session for cloudwatch. %+v", err) + cw.Logger.Errorf("Error in creating session for cloudwatch. %+v", err) return err } cw.svc = cloudwatch.New(s) @@ -94,10 +96,10 @@ func (cw *CloudWatchMonitoringService) Start() error { } func (cw *CloudWatchMonitoringService) Shutdown() { - log.Info("Shutting down cloudwatch metrics system...") + cw.Logger.Infof("Shutting down cloudwatch metrics system...") close(*cw.stop) cw.waitGroup.Wait() - log.Info("Cloudwatch metrics system has been shutdown.") + cw.Logger.Infof("Cloudwatch metrics system has been shutdown.") } // Start daemon to flush metrics periodically @@ -106,14 +108,14 @@ func (cw *CloudWatchMonitoringService) eventloop() { for { if err := cw.flush(); err != nil { - log.Errorf("Error sending metrics to CloudWatch. %+v", err) + cw.Logger.Errorf("Error sending metrics to CloudWatch. %+v", err) } select { case <-*cw.stop: - log.Info("Shutting down monitoring system") + cw.Logger.Infof("Shutting down monitoring system") if err := cw.flush(); err != nil { - log.Errorf("Error sending metrics to CloudWatch. %+v", err) + cw.Logger.Errorf("Error sending metrics to CloudWatch. %+v", err) } return case <-time.After(time.Duration(cw.MetricsBufferTimeMillis) * time.Millisecond): @@ -237,7 +239,7 @@ func (cw *CloudWatchMonitoringService) flushShard(shard string, metric *cloudWat metric.getRecordsTime = []float64{} metric.processRecordsTime = []float64{} } else { - log.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) + cw.Logger.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) } metric.Unlock() @@ -245,7 +247,7 @@ func (cw *CloudWatchMonitoringService) flushShard(shard string, metric *cloudWat } func (cw *CloudWatchMonitoringService) flush() error { - log.Debugf("Flushing metrics data. Stream: %s, Worker: %s", cw.KinesisStream, cw.WorkerID) + cw.Logger.Debugf("Flushing metrics data. Stream: %s, Worker: %s", cw.KinesisStream, cw.WorkerID) // publish per shard metrics cw.shardMetrics.Range(func(k, v interface{}) bool { shard, metric := k.(string), v.(*cloudWatchMetrics) diff --git a/clientlibrary/metrics/interfaces.go b/clientlibrary/metrics/interfaces.go index 41ef053..c79cb61 100644 --- a/clientlibrary/metrics/interfaces.go +++ b/clientlibrary/metrics/interfaces.go @@ -29,6 +29,7 @@ package metrics import ( "fmt" + "github.com/vmware/vmware-go-kcl/logger" ) // MonitoringConfiguration allows you to configure how record processing metrics are exposed @@ -38,6 +39,7 @@ type MonitoringConfiguration struct { Prometheus PrometheusMonitoringService CloudWatch CloudWatchMonitoringService service MonitoringService + Logger logger.Logger } type MonitoringService interface { @@ -60,18 +62,25 @@ func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID st return nil } + // Config with default logger if logger is not specified. + if m.Logger == nil { + m.Logger = logger.GetDefaultLogger() + } + switch m.MonitoringService { case "prometheus": m.Prometheus.Namespace = nameSpace m.Prometheus.KinesisStream = streamName m.Prometheus.WorkerID = workerID m.Prometheus.Region = m.Region + m.Prometheus.Logger = m.Logger m.service = &m.Prometheus case "cloudwatch": m.CloudWatch.Namespace = nameSpace m.CloudWatch.KinesisStream = streamName m.CloudWatch.WorkerID = workerID m.CloudWatch.Region = m.Region + m.CloudWatch.Logger = m.Logger m.service = &m.CloudWatch default: return fmt.Errorf("Invalid monitoring service type %s", m.MonitoringService) diff --git a/clientlibrary/metrics/prometheus.go b/clientlibrary/metrics/prometheus.go index 81c08ce..3dae914 100644 --- a/clientlibrary/metrics/prometheus.go +++ b/clientlibrary/metrics/prometheus.go @@ -32,7 +32,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - log "github.com/sirupsen/logrus" + + "github.com/vmware/vmware-go-kcl/logger" ) // PrometheusMonitoringService to start Prometheus as metrics system. @@ -41,10 +42,12 @@ import ( type PrometheusMonitoringService struct { ListenAddress string - Namespace string - KinesisStream string - WorkerID string - Region string + Namespace string + KinesisStream string + WorkerID string + Region string + Logger logger.Logger + processedRecords *prometheus.CounterVec processedBytes *prometheus.CounterVec behindLatestMillis *prometheus.GaugeVec @@ -106,12 +109,12 @@ func (p *PrometheusMonitoringService) Init() error { func (p *PrometheusMonitoringService) Start() error { http.Handle("/metrics", promhttp.Handler()) go func() { - log.Infof("Starting Prometheus listener on %s", p.ListenAddress) + p.Logger.Infof("Starting Prometheus listener on %s", p.ListenAddress) err := http.ListenAndServe(p.ListenAddress, nil) if err != nil { - log.Errorf("Error starting Prometheus metrics endpoint. %+v", err) + p.Logger.Errorf("Error starting Prometheus metrics endpoint. %+v", err) } - log.Info("Stopped metrics server") + p.Logger.Infof("Stopped metrics server") }() return nil diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 87b24aa..04bae9f 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -32,8 +32,6 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/kinesis" @@ -86,6 +84,8 @@ type ShardConsumer struct { } func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, error) { + log := sc.kclConfig.Logger + // Get checkpoint of the shard from dynamoDB err := sc.checkpointer.FetchCheckpoint(shard) if err != nil && err != chk.ErrSequenceIDNotFound { @@ -128,6 +128,8 @@ func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, erro func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { defer sc.releaseLease(shard) + log := sc.kclConfig.Logger + // If the shard is child shard, need to wait until the parent finished. if err := sc.waitOnParentShard(shard); err != nil { // If parent shard has been deleted by Kinesis system already, just ignore the error. @@ -282,6 +284,7 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error { // Cleanup the internal lease cache func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) { + log := sc.kclConfig.Logger log.Infof("Release lease for shard %s", shard.ID) shard.SetLeaseOwner("") diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 2a7bd52..fac1a7d 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -32,8 +32,6 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" @@ -105,19 +103,20 @@ func (w *Worker) WithCheckpointer(checker chk.Checkpointer) *Worker { // Run starts consuming data from the stream, and pass it to the application record processors. func (w *Worker) Start() error { + log := w.kclConfig.Logger if err := w.initialize(); err != nil { log.Errorf("Failed to initialize Worker: %+v", err) return err } // Start monitoring service - log.Info("Starting monitoring service.") + log.Infof("Starting monitoring service.") if err := w.mService.Start(); err != nil { log.Errorf("Failed to start monitoring service: %+v", err) return err } - log.Info("Starting worker event loop.") + log.Infof("Starting worker event loop.") // entering event loop go w.eventLoop() return nil @@ -125,7 +124,8 @@ func (w *Worker) Start() error { // Shutdown signals worker to shutdown. Worker will try initiating shutdown of all record processors. func (w *Worker) Shutdown() { - log.Info("Worker shutdown in requested.") + log := w.kclConfig.Logger + log.Infof("Worker shutdown in requested.") if w.done { return @@ -136,11 +136,12 @@ func (w *Worker) Shutdown() { w.waitGroup.Wait() w.mService.Shutdown() - log.Info("Worker loop is complete. Exiting from worker.") + log.Infof("Worker loop is complete. Exiting from worker.") } // Publish to write some data into stream. This function is mainly used for testing purpose. func (w *Worker) Publish(streamName, partitionKey string, data []byte) error { + log := w.kclConfig.Logger _, err := w.kc.PutRecord(&kinesis.PutRecordInput{ Data: data, StreamName: aws.String(streamName), @@ -154,12 +155,13 @@ func (w *Worker) Publish(streamName, partitionKey string, data []byte) error { // initialize func (w *Worker) initialize() error { - log.Info("Worker initialization in progress...") + log := w.kclConfig.Logger + log.Infof("Worker initialization in progress...") // Create default Kinesis session if w.kc == nil { // create session for Kinesis - log.Info("Creating Kinesis session") + log.Infof("Creating Kinesis session") s, err := session.NewSession(&aws.Config{ Region: aws.String(w.regionName), @@ -173,15 +175,15 @@ func (w *Worker) initialize() error { } w.kc = kinesis.New(s) } else { - log.Info("Use custom Kinesis service.") + log.Infof("Use custom Kinesis service.") } // Create default dynamodb based checkpointer implementation if w.checkpointer == nil { - log.Info("Creating DynamoDB based checkpointer") + log.Infof("Creating DynamoDB based checkpointer") w.checkpointer = chk.NewDynamoCheckpoint(w.kclConfig) } else { - log.Info("Use custom checkpointer implementation.") + log.Infof("Use custom checkpointer implementation.") } err := w.metricsConfig.Init(w.kclConfig.ApplicationName, w.streamName, w.workerID) @@ -190,7 +192,7 @@ func (w *Worker) initialize() error { } w.mService = w.metricsConfig.GetMonitoringService() - log.Info("Initializing Checkpointer") + log.Infof("Initializing Checkpointer") if err := w.checkpointer.Init(); err != nil { log.Errorf("Failed to start Checkpointer: %+v", err) return err @@ -203,7 +205,7 @@ func (w *Worker) initialize() error { w.waitGroup = &sync.WaitGroup{} - log.Info("Initialization complete.") + log.Infof("Initialization complete.") return nil } @@ -226,6 +228,8 @@ func (w *Worker) newShardConsumer(shard *par.ShardStatus) *ShardConsumer { // eventLoop func (w *Worker) eventLoop() { + log := w.kclConfig.Logger + for { err := w.syncShard() if err != nil { @@ -271,7 +275,7 @@ func (w *Worker) eventLoop() { if err != nil { // cannot get lease on the shard if err.Error() != chk.ErrLeaseNotAquired { - log.Error(err) + log.Errorf("Cannot get lease: %+v", err) } continue } @@ -284,7 +288,9 @@ func (w *Worker) eventLoop() { w.waitGroup.Add(1) go func() { defer w.waitGroup.Done() - sc.getRecords(shard) + if err := sc.getRecords(shard); err != nil { + log.Errorf("Error in getRecords: %+v", err) + } }() // exit from for loop and not to grab more shard for now. break @@ -293,7 +299,7 @@ func (w *Worker) eventLoop() { select { case <-*w.stop: - log.Info("Shutting down...") + log.Infof("Shutting down...") return case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond): } @@ -303,6 +309,7 @@ func (w *Worker) eventLoop() { // List all ACTIVE shard and store them into shardStatus table // If shard has been removed, need to exclude it from cached shard status. func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) error { + log := w.kclConfig.Logger // The default pagination limit is 100. args := &kinesis.DescribeStreamInput{ StreamName: aws.String(w.streamName), @@ -355,6 +362,7 @@ func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) err // syncShard to sync the cached shard info with actual shard info from Kinesis func (w *Worker) syncShard() error { + log := w.kclConfig.Logger shardInfo := make(map[string]bool) err := w.getShardIDs("", shardInfo) diff --git a/go.mod b/go.mod index c18af03..9a4ec48 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/vmware/vmware-go-kcl require ( + github.com/BurntSushi/toml v0.3.1 // indirect github.com/aws/aws-sdk-go v1.19.38 github.com/google/uuid v1.1.1 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect @@ -9,7 +10,13 @@ require ( github.com/prometheus/procfs v0.0.0-20190523193104-a7aeb8df3389 // indirect github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.3.0 + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.2.0 // indirect + go.uber.org/zap v1.11.0 golang.org/x/net v0.0.0-20190522155817-f3200d17e092 // indirect golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4 // indirect golang.org/x/text v0.3.2 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) + +go 1.13 diff --git a/go.sum b/go.sum index 9a5146b..19dc05c 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -32,6 +34,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0j github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -59,6 +62,12 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4= +go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.11.0 h1:gSmpCfs+R47a4yQPAI4xJ0IPDLTRGXskm6UelqNXpqE= +go.uber.org/zap v1.11.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -78,5 +87,9 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..1712899 --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ +// https://github.com/amitrai48/logger + +package logger + +import ( + "github.com/sirupsen/logrus" +) + +// Fields Type to pass when we want to call WithFields for structured logging +type Fields map[string]interface{} + +const ( + //Debug has verbose message + Debug = "debug" + //Info is default log level + Info = "info" + //Warn is for logging messages about possible issues + Warn = "warn" + //Error is for logging errors + Error = "error" + //Fatal is for logging fatal messages. The sytem shutsdown after logging the message. + Fatal = "fatal" +) + +// Logger is the common interface for logging. +type Logger interface { + Debugf(format string, args ...interface{}) + + Infof(format string, args ...interface{}) + + Warnf(format string, args ...interface{}) + + Errorf(format string, args ...interface{}) + + Fatalf(format string, args ...interface{}) + + Panicf(format string, args ...interface{}) + + WithFields(keyValues Fields) Logger +} + +// Configuration stores the config for the logger +// For some loggers there can only be one level across writers, for such the level of Console is picked by default +type Configuration struct { + EnableConsole bool + ConsoleJSONFormat bool + ConsoleLevel string + EnableFile bool + FileJSONFormat bool + FileLevel string + + // Filename is the file to write logs to. Backup log files will be retained + // in the same directory. It uses -lumberjack.log in + // os.TempDir() if empty. + Filename string + + // MaxSize is the maximum size in megabytes of the log file before it gets + // rotated. It defaults to 100 megabytes. + MaxSizeMB int + + // MaxAge is the maximum number of days to retain old log files based on the + // timestamp encoded in their filename. Note that a day is defined as 24 + // hours and may not exactly correspond to calendar days due to daylight + // savings, leap seconds, etc. The default is 7 days. + MaxAgeDays int + + // MaxBackups is the maximum number of old log files to retain. The default + // is to retain all old log files (though MaxAge may still cause them to get + // deleted.) + MaxBackups int + + // LocalTime determines if the time used for formatting the timestamps in + // backup files is the computer's local time. The default is to use UTC + // time. + LocalTime bool +} + +// GetDefaultLogger creates a default logger. +func GetDefaultLogger() Logger { + return NewLogrusLogger(logrus.StandardLogger()) +} + +// normalizeConfig to enforce default value in configuration. +func normalizeConfig(config *Configuration) { + if config.MaxSizeMB <= 0 { + config.MaxSizeMB = 100 + } + + if config.MaxAgeDays <= 0 { + config.MaxAgeDays = 7 + } + + if config.MaxBackups < 0 { + config.MaxBackups = 0 + } +} diff --git a/logger/logger_test.go b/logger/logger_test.go new file mode 100644 index 0000000..c55a6a5 --- /dev/null +++ b/logger/logger_test.go @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ + +package logger + +import ( + "github.com/stretchr/testify/assert" + + "github.com/sirupsen/logrus" + "go.uber.org/zap" + "testing" +) + +func TestZapLoggerWithConfig(t *testing.T) { + config := Configuration{ + EnableConsole: true, + ConsoleLevel: Debug, + ConsoleJSONFormat: true, + EnableFile: false, + FileLevel: Info, + FileJSONFormat: true, + Filename: "log.log", + } + + log := NewZapLoggerWithConfig(config) + + contextLogger := log.WithFields(Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with zap") + contextLogger.Infof("Zap is awesome") +} + +func TestZapLogger(t *testing.T) { + zapLogger, err := zap.NewProduction() + assert.Nil(t, err) + + log := NewZapLogger(zapLogger.Sugar()) + + contextLogger := log.WithFields(Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with zap") + contextLogger.Infof("Zap is awesome") +} + +func TestLogrusLoggerWithConfig(t *testing.T) { + config := Configuration{ + EnableConsole: true, + ConsoleLevel: Debug, + ConsoleJSONFormat: false, + EnableFile: false, + FileLevel: Info, + FileJSONFormat: true, + } + + log := NewLogrusLoggerWithConfig(config) + + contextLogger := log.WithFields(Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with logrus") + contextLogger.Infof("Logrus is awesome") +} + +func TestLogrusLogger(t *testing.T) { + // adapts to Logger interface + log := NewLogrusLogger(logrus.StandardLogger()) + + contextLogger := log.WithFields(Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with logrus") + contextLogger.Infof("Logrus is awesome") +} diff --git a/logger/logrus.go b/logger/logrus.go new file mode 100644 index 0000000..464f691 --- /dev/null +++ b/logger/logrus.go @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ +// https://github.com/amitrai48/logger + +package logger + +import ( + "io" + "os" + + "github.com/sirupsen/logrus" + lumberjack "gopkg.in/natefinch/lumberjack.v2" +) + +type LogrusLogEntry struct { + entry *logrus.Entry +} + +type LogrusLogger struct { + logger *logrus.Logger +} + +// NewLogrusLogger adapts existing logrus logger to Logger interface. +// The call is responsible for configuring logrus logger appropriately. +func NewLogrusLogger(lLogger *logrus.Logger) Logger { + return &LogrusLogger{ + logger: lLogger, + } +} + +// NewLogrusLoggerWithConfig creates and configs Logger instance backed by +// logrus logger. +func NewLogrusLoggerWithConfig(config Configuration) Logger { + logLevel := config.ConsoleLevel + if logLevel == "" { + logLevel = config.FileLevel + } + + level, err := logrus.ParseLevel(logLevel) + if err != nil { + // fallback to InfoLevel + level = logrus.InfoLevel + } + + normalizeConfig(&config) + + stdOutHandler := os.Stdout + fileHandler := &lumberjack.Logger{ + Filename: config.Filename, + MaxSize: config.MaxSizeMB, + Compress: true, + MaxAge: config.MaxAgeDays, + MaxBackups: config.MaxBackups, + LocalTime: config.LocalTime, + } + lLogger := &logrus.Logger{ + Out: stdOutHandler, + Formatter: getFormatter(config.ConsoleJSONFormat), + Hooks: make(logrus.LevelHooks), + Level: level, + } + + if config.EnableConsole && config.EnableFile { + lLogger.SetOutput(io.MultiWriter(stdOutHandler, fileHandler)) + } else { + if config.EnableFile { + lLogger.SetOutput(fileHandler) + lLogger.SetFormatter(getFormatter(config.FileJSONFormat)) + } + } + + return &LogrusLogger{ + logger: lLogger, + } +} + +func (l *LogrusLogger) Debugf(format string, args ...interface{}) { + l.logger.Debugf(format, args...) +} + +func (l *LogrusLogger) Infof(format string, args ...interface{}) { + l.logger.Infof(format, args...) +} + +func (l *LogrusLogger) Warnf(format string, args ...interface{}) { + l.logger.Warnf(format, args...) +} + +func (l *LogrusLogger) Errorf(format string, args ...interface{}) { + l.logger.Errorf(format, args...) +} + +func (l *LogrusLogger) Fatalf(format string, args ...interface{}) { + l.logger.Fatalf(format, args...) +} + +func (l *LogrusLogger) Panicf(format string, args ...interface{}) { + l.logger.Fatalf(format, args...) +} + +func (l *LogrusLogger) WithFields(fields Fields) Logger { + return &LogrusLogEntry{ + entry: l.logger.WithFields(convertToLogrusFields(fields)), + } +} + +func (l *LogrusLogEntry) Debugf(format string, args ...interface{}) { + l.entry.Debugf(format, args...) +} + +func (l *LogrusLogEntry) Infof(format string, args ...interface{}) { + l.entry.Infof(format, args...) +} + +func (l *LogrusLogEntry) Warnf(format string, args ...interface{}) { + l.entry.Warnf(format, args...) +} + +func (l *LogrusLogEntry) Errorf(format string, args ...interface{}) { + l.entry.Errorf(format, args...) +} + +func (l *LogrusLogEntry) Fatalf(format string, args ...interface{}) { + l.entry.Fatalf(format, args...) +} + +func (l *LogrusLogEntry) Panicf(format string, args ...interface{}) { + l.entry.Fatalf(format, args...) +} + +func (l *LogrusLogEntry) WithFields(fields Fields) Logger { + return &LogrusLogEntry{ + entry: l.entry.WithFields(convertToLogrusFields(fields)), + } +} + +func getFormatter(isJSON bool) logrus.Formatter { + if isJSON { + return &logrus.JSONFormatter{} + } + return &logrus.TextFormatter{ + FullTimestamp: true, + DisableLevelTruncation: true, + } +} + +func convertToLogrusFields(fields Fields) logrus.Fields { + logrusFields := logrus.Fields{} + for index, val := range fields { + logrusFields[index] = val + } + return logrusFields +} diff --git a/logger/zap.go b/logger/zap.go new file mode 100644 index 0000000..01fdeb7 --- /dev/null +++ b/logger/zap.go @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ +// https://github.com/amitrai48/logger + +package logger + +import ( + "os" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + lumberjack "gopkg.in/natefinch/lumberjack.v2" +) + +type ZapLogger struct { + sugaredLogger *zap.SugaredLogger +} + +// NewZapLogger adapts existing sugared zap logger to Logger interface. +// The call is responsible for configuring sugard zap logger appropriately. +// +// Note: Sugar wraps the Logger to provide a more ergonomic, but slightly slower, +// API. Sugaring a Logger is quite inexpensive, so it's reasonable for a +// single application to use both Loggers and SugaredLoggers, converting +// between them on the boundaries of performance-sensitive code. +// +// Base zap logger can be convert to SugaredLogger by calling to add a wrapper: +// sugaredLogger := log.Sugar() +// +func NewZapLogger(logger *zap.SugaredLogger) Logger { + return &ZapLogger{ + sugaredLogger: logger, + } +} + +// NewZapLoggerWithConfig creates and configs Logger instance backed by +// zap Sugared logger. +func NewZapLoggerWithConfig(config Configuration) Logger { + cores := []zapcore.Core{} + + if config.EnableConsole { + level := getZapLevel(config.ConsoleLevel) + writer := zapcore.Lock(os.Stdout) + core := zapcore.NewCore(getEncoder(config.ConsoleJSONFormat), writer, level) + cores = append(cores, core) + } + + if config.EnableFile { + level := getZapLevel(config.FileLevel) + writer := zapcore.AddSync(&lumberjack.Logger{ + Filename: config.Filename, + MaxSize: config.MaxSizeMB, + Compress: true, + MaxAge: config.MaxAgeDays, + MaxBackups: config.MaxBackups, + LocalTime: config.LocalTime, + }) + core := zapcore.NewCore(getEncoder(config.FileJSONFormat), writer, level) + cores = append(cores, core) + } + + combinedCore := zapcore.NewTee(cores...) + + // AddCallerSkip skips 2 number of callers, this is important else the file that gets + // logged will always be the wrapped file. In our case zap.go + logger := zap.New(combinedCore, + zap.AddCallerSkip(2), + zap.AddCaller(), + ).Sugar() + + return &ZapLogger{ + sugaredLogger: logger, + } +} + +func (l *ZapLogger) Debugf(format string, args ...interface{}) { + l.sugaredLogger.Debugf(format, args...) +} + +func (l *ZapLogger) Infof(format string, args ...interface{}) { + l.sugaredLogger.Infof(format, args...) +} + +func (l *ZapLogger) Warnf(format string, args ...interface{}) { + l.sugaredLogger.Warnf(format, args...) +} + +func (l *ZapLogger) Errorf(format string, args ...interface{}) { + l.sugaredLogger.Errorf(format, args...) +} + +func (l *ZapLogger) Fatalf(format string, args ...interface{}) { + l.sugaredLogger.Fatalf(format, args...) +} + +func (l *ZapLogger) Panicf(format string, args ...interface{}) { + l.sugaredLogger.Fatalf(format, args...) +} + +func (l *ZapLogger) WithFields(fields Fields) Logger { + var f = make([]interface{}, 0) + for k, v := range fields { + f = append(f, k) + f = append(f, v) + } + newLogger := l.sugaredLogger.With(f...) + return &ZapLogger{newLogger} +} + +func getEncoder(isJSON bool) zapcore.Encoder { + encoderConfig := zap.NewProductionEncoderConfig() + encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + if isJSON { + return zapcore.NewJSONEncoder(encoderConfig) + } + return zapcore.NewConsoleEncoder(encoderConfig) +} + +func getZapLevel(level string) zapcore.Level { + switch level { + case Info: + return zapcore.InfoLevel + case Warn: + return zapcore.WarnLevel + case Debug: + return zapcore.DebugLevel + case Error: + return zapcore.ErrorLevel + case Fatal: + return zapcore.FatalLevel + default: + return zapcore.InfoLevel + } +} diff --git a/test/logger_test.go b/test/logger_test.go new file mode 100644 index 0000000..42e44f0 --- /dev/null +++ b/test/logger_test.go @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2019 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/ + +package test + +import ( + "github.com/stretchr/testify/assert" + + "github.com/sirupsen/logrus" + "go.uber.org/zap" + "testing" + + "github.com/vmware/vmware-go-kcl/logger" +) + +func TestZapLoggerWithConfig(t *testing.T) { + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: true, + EnableFile: true, + FileLevel: logger.Info, + FileJSONFormat: true, + Filename: "log.log", + } + + log := logger.NewZapLoggerWithConfig(config) + + contextLogger := log.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with zap") + contextLogger.Infof("Zap is awesome") +} + +func TestZapLogger(t *testing.T) { + zapLogger, err := zap.NewProduction() + assert.Nil(t, err) + + log := logger.NewZapLogger(zapLogger.Sugar()) + + contextLogger := log.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with zap") + contextLogger.Infof("Zap is awesome") +} + +func TestLogrusLoggerWithConfig(t *testing.T) { + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: false, + EnableFile: true, + FileLevel: logger.Info, + FileJSONFormat: true, + Filename: "log.log", + } + + log := logger.NewLogrusLoggerWithConfig(config) + + contextLogger := log.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with logrus") + contextLogger.Infof("Logrus is awesome") +} + +func TestLogrusLogger(t *testing.T) { + // adapts to Logger interface + log := logger.NewLogrusLogger(logrus.StandardLogger()) + + contextLogger := log.WithFields(logger.Fields{"key1": "value1"}) + contextLogger.Debugf("Starting with logrus") + contextLogger.Infof("Logrus is awesome") +} diff --git a/test/worker_test.go b/test/worker_test.go index d950058..22f7ccb 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -19,9 +19,6 @@ package test import ( - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/credentials/stscreds" - "github.com/aws/aws-sdk-go/aws/session" "net/http" "os" "os/signal" @@ -30,15 +27,17 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + "github.com/aws/aws-sdk-go/aws/session" "github.com/prometheus/common/expfmt" - log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config" kc "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" "github.com/vmware/vmware-go-kcl/clientlibrary/metrics" "github.com/vmware/vmware-go-kcl/clientlibrary/utils" wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker" + "github.com/vmware/vmware-go-kcl/logger" ) const ( @@ -53,6 +52,22 @@ const metricsSystem = "cloudwatch" var shardID string func TestWorker(t *testing.T) { + // At miminal. use standard logrus logger + // log := logger.NewLogrusLogger(logrus.StandardLogger()) + // + // In order to have precise control over logging. Use logger with config + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: false, + EnableFile: true, + FileLevel: logger.Info, + FileJSONFormat: true, + Filename: "log.log", + } + // Use logrus logger + log := logger.NewLogrusLoggerWithConfig(config) + kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). @@ -60,12 +75,31 @@ func TestWorker(t *testing.T) { WithShardSyncIntervalMillis(5000). WithFailoverTimeMillis(300000). WithMetricsBufferTimeMillis(10000). - WithMetricsMaxQueueSize(20) + WithMetricsMaxQueueSize(20). + WithLogger(log) runTest(kclConfig, false, t) } func TestWorkerWithSigInt(t *testing.T) { + // At miminal. use standard zap logger + //zapLogger, err := zap.NewProduction() + //assert.Nil(t, err) + //log := logger.NewZapLogger(zapLogger.Sugar()) + // + // In order to have precise control over logging. Use logger with config. + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: true, + EnableFile: true, + FileLevel: logger.Info, + FileJSONFormat: true, + Filename: "log.log", + } + // use zap logger + log := logger.NewZapLoggerWithConfig(config) + kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). @@ -73,7 +107,8 @@ func TestWorkerWithSigInt(t *testing.T) { WithShardSyncIntervalMillis(5000). WithFailoverTimeMillis(300000). WithMetricsBufferTimeMillis(10000). - WithMetricsMaxQueueSize(20) + WithMetricsMaxQueueSize(20). + WithLogger(log) runTest(kclConfig, true, t) } @@ -120,9 +155,6 @@ func TestWorkerAssumeRole(t *testing.T) { } func runTest(kclConfig *cfg.KinesisClientLibConfiguration, triggersig bool, t *testing.T) { - log.SetOutput(os.Stdout) - log.SetLevel(log.DebugLevel) - assert.Equal(t, regionName, kclConfig.RegionName) assert.Equal(t, streamName, kclConfig.StreamName) @@ -192,6 +224,7 @@ func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service stri return &metrics.MonitoringConfiguration{ MonitoringService: "cloudwatch", Region: regionName, + Logger: kclConfig.Logger, CloudWatch: metrics.CloudWatchMonitoringService{ Credentials: kclConfig.CloudWatchCredentials, // Those value should come from kclConfig @@ -205,6 +238,7 @@ func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service stri return &metrics.MonitoringConfiguration{ MonitoringService: "prometheus", Region: regionName, + Logger: kclConfig.Logger, Prometheus: metrics.PrometheusMonitoringService{ ListenAddress: ":8080", },