Add generic logger support (#43)

* Add generic logger support

The current KCL has tight coupling with logrus and it causes
issue for customer to use different logging system such as zap log.
The issue has been opened via:
https://github.com/vmware/vmware-go-kcl/issues/27

This change is to created a logger interface be able to abstract
above logrus and zap log. It makes easy to add support for other
logging system in the fugure. The work is based on:
https://www.mountedthoughts.com/golang-logger-interface/

Some updates are made in order to make logging system easily
injectable and add more unit tests.

Tested against real kinesis and dyamodb as well.

Signed-off-by: Tao Jiang <taoj@vmware.com>

* Add lumberjack configuration options to have fine grained control

Update the file log configuratio by adding most of luberjack
configuration to avoid hardcode default value. Let user to specify
the value because log retention and rotation are very important
for prod environment.

Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
Tao Jiang 2019-10-28 07:08:18 -05:00
parent c8a5aa1891
commit 0d91fbd443
17 changed files with 768 additions and 60 deletions

View file

@ -29,18 +29,18 @@ package checkpoint
import ( import (
"errors" "errors"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/session"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
log "github.com/sirupsen/logrus"
"github.com/vmware/vmware-go-kcl/clientlibrary/config" "github.com/vmware/vmware-go-kcl/clientlibrary/config"
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition" par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
"github.com/vmware/vmware-go-kcl/logger"
) )
const ( const (
@ -53,18 +53,20 @@ const (
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend // DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
type DynamoCheckpoint struct { type DynamoCheckpoint struct {
log logger.Logger
TableName string TableName string
leaseTableReadCapacity int64 leaseTableReadCapacity int64
leaseTableWriteCapacity int64 leaseTableWriteCapacity int64
LeaseDuration int LeaseDuration int
svc dynamodbiface.DynamoDBAPI svc dynamodbiface.DynamoDBAPI
kclConfig *config.KinesisClientLibConfiguration kclConfig *config.KinesisClientLibConfiguration
Retries int Retries int
} }
func NewDynamoCheckpoint(kclConfig *config.KinesisClientLibConfiguration) *DynamoCheckpoint { func NewDynamoCheckpoint(kclConfig *config.KinesisClientLibConfiguration) *DynamoCheckpoint {
checkpointer := &DynamoCheckpoint{ checkpointer := &DynamoCheckpoint{
log: kclConfig.Logger,
TableName: kclConfig.TableName, TableName: kclConfig.TableName,
leaseTableReadCapacity: int64(kclConfig.InitialLeaseTableReadCapacity), leaseTableReadCapacity: int64(kclConfig.InitialLeaseTableReadCapacity),
leaseTableWriteCapacity: int64(kclConfig.InitialLeaseTableWriteCapacity), leaseTableWriteCapacity: int64(kclConfig.InitialLeaseTableWriteCapacity),
@ -84,7 +86,7 @@ func (checkpointer *DynamoCheckpoint) WithDynamoDB(svc dynamodbiface.DynamoDBAPI
// Init initialises the DynamoDB Checkpoint // Init initialises the DynamoDB Checkpoint
func (checkpointer *DynamoCheckpoint) Init() error { func (checkpointer *DynamoCheckpoint) Init() error {
log.Info("Creating DynamoDB session") checkpointer.log.Infof("Creating DynamoDB session")
s, err := session.NewSession(&aws.Config{ s, err := session.NewSession(&aws.Config{
Region: aws.String(checkpointer.kclConfig.RegionName), Region: aws.String(checkpointer.kclConfig.RegionName),
@ -95,7 +97,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
if err != nil { if err != nil {
// no need to move forward // no need to move forward
log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err) checkpointer.log.Fatalf("Failed in getting DynamoDB session for creating Worker: %+v", err)
} }
if checkpointer.svc == nil { if checkpointer.svc == nil {
@ -137,7 +139,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
return errors.New(ErrLeaseNotAquired) return errors.New(ErrLeaseNotAquired)
} }
log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout" conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout"
expressionAttributeValues = map[string]*dynamodb.AttributeValue{ expressionAttributeValues = map[string]*dynamodb.AttributeValue{
":id": { ":id": {
@ -228,7 +230,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
if !ok { if !ok {
return ErrSequenceIDNotFound return ErrSequenceIDNotFound
} }
log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S) checkpointer.log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S)
shard.Mux.Lock() shard.Mux.Lock()
defer shard.Mux.Unlock() defer shard.Mux.Unlock()
shard.Checkpoint = aws.StringValue(sequenceID.S) shard.Checkpoint = aws.StringValue(sequenceID.S)
@ -244,9 +246,9 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseInfo(shardID string) error {
err := checkpointer.removeItem(shardID) err := checkpointer.removeItem(shardID)
if err != nil { if err != nil {
log.Errorf("Error in removing lease info for shard: %s, Error: %+v", shardID, err) checkpointer.log.Errorf("Error in removing lease info for shard: %s, Error: %+v", shardID, err)
} else { } else {
log.Infof("Lease info for shard: %s has been removed.", shardID) checkpointer.log.Infof("Lease info for shard: %s has been removed.", shardID)
} }
return err return err

View file

@ -41,6 +41,7 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
creds "github.com/aws/aws-sdk-go/aws/credentials" creds "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/vmware/vmware-go-kcl/logger"
) )
const ( const (
@ -256,6 +257,9 @@ type (
// Worker should skip syncing shards and leases at startup if leases are present // Worker should skip syncing shards and leases at startup if leases are present
// This is useful for optimizing deployments to large fleets working on a stable stream. // This is useful for optimizing deployments to large fleets working on a stable stream.
SkipShardSyncAtWorkerInitializationIfLeasesExist bool SkipShardSyncAtWorkerInitializationIfLeasesExist bool
// Logger used to log message.
Logger logger.Logger
} }
) )

View file

@ -19,6 +19,7 @@
package config package config
import ( import (
"github.com/vmware/vmware-go-kcl/logger"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -37,4 +38,8 @@ func TestConfig(t *testing.T) {
assert.Equal(t, "appName", kclConfig.ApplicationName) assert.Equal(t, "appName", kclConfig.ApplicationName)
assert.Equal(t, 500, kclConfig.FailoverTimeMillis) assert.Equal(t, 500, kclConfig.FailoverTimeMillis)
contextLogger := kclConfig.Logger.WithFields(logger.Fields{"key1": "value1"})
contextLogger.Debugf("Starting with default logger")
contextLogger.Infof("Default logger is awesome")
} }

View file

@ -34,10 +34,12 @@
package config package config
import ( import (
"github.com/aws/aws-sdk-go/aws/credentials" "log"
"time" "time"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/vmware/vmware-go-kcl/clientlibrary/utils" "github.com/vmware/vmware-go-kcl/clientlibrary/utils"
"github.com/vmware/vmware-go-kcl/logger"
) )
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. // NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
@ -92,6 +94,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Logger: logger.GetDefaultLogger(),
} }
} }
@ -201,3 +204,11 @@ func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueS
c.MetricsMaxQueueSize = metricsMaxQueueSize c.MetricsMaxQueueSize = metricsMaxQueueSize
return c return c
} }
func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *KinesisClientLibConfiguration {
if logger == nil {
log.Panic("Logger cannot be null")
}
c.Logger = logger
return c
}

View file

@ -28,15 +28,16 @@
package metrics package metrics
import ( import (
"github.com/aws/aws-sdk-go/aws/credentials"
"sync" "sync"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
log "github.com/sirupsen/logrus"
"github.com/vmware/vmware-go-kcl/logger"
) )
type CloudWatchMonitoringService struct { type CloudWatchMonitoringService struct {
@ -45,6 +46,7 @@ type CloudWatchMonitoringService struct {
WorkerID string WorkerID string
Region string Region string
Credentials *credentials.Credentials Credentials *credentials.Credentials
Logger logger.Logger
// control how often to pusblish to CloudWatch // control how often to pusblish to CloudWatch
MetricsBufferTimeMillis int MetricsBufferTimeMillis int
@ -72,7 +74,7 @@ func (cw *CloudWatchMonitoringService) Init() error {
cfg.Credentials = cw.Credentials cfg.Credentials = cw.Credentials
s, err := session.NewSession(cfg) s, err := session.NewSession(cfg)
if err != nil { if err != nil {
log.Errorf("Error in creating session for cloudwatch. %+v", err) cw.Logger.Errorf("Error in creating session for cloudwatch. %+v", err)
return err return err
} }
cw.svc = cloudwatch.New(s) cw.svc = cloudwatch.New(s)
@ -94,10 +96,10 @@ func (cw *CloudWatchMonitoringService) Start() error {
} }
func (cw *CloudWatchMonitoringService) Shutdown() { func (cw *CloudWatchMonitoringService) Shutdown() {
log.Info("Shutting down cloudwatch metrics system...") cw.Logger.Infof("Shutting down cloudwatch metrics system...")
close(*cw.stop) close(*cw.stop)
cw.waitGroup.Wait() cw.waitGroup.Wait()
log.Info("Cloudwatch metrics system has been shutdown.") cw.Logger.Infof("Cloudwatch metrics system has been shutdown.")
} }
// Start daemon to flush metrics periodically // Start daemon to flush metrics periodically
@ -106,14 +108,14 @@ func (cw *CloudWatchMonitoringService) eventloop() {
for { for {
if err := cw.flush(); err != nil { if err := cw.flush(); err != nil {
log.Errorf("Error sending metrics to CloudWatch. %+v", err) cw.Logger.Errorf("Error sending metrics to CloudWatch. %+v", err)
} }
select { select {
case <-*cw.stop: case <-*cw.stop:
log.Info("Shutting down monitoring system") cw.Logger.Infof("Shutting down monitoring system")
if err := cw.flush(); err != nil { if err := cw.flush(); err != nil {
log.Errorf("Error sending metrics to CloudWatch. %+v", err) cw.Logger.Errorf("Error sending metrics to CloudWatch. %+v", err)
} }
return return
case <-time.After(time.Duration(cw.MetricsBufferTimeMillis) * time.Millisecond): case <-time.After(time.Duration(cw.MetricsBufferTimeMillis) * time.Millisecond):
@ -237,7 +239,7 @@ func (cw *CloudWatchMonitoringService) flushShard(shard string, metric *cloudWat
metric.getRecordsTime = []float64{} metric.getRecordsTime = []float64{}
metric.processRecordsTime = []float64{} metric.processRecordsTime = []float64{}
} else { } else {
log.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) cw.Logger.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err)
} }
metric.Unlock() metric.Unlock()
@ -245,7 +247,7 @@ func (cw *CloudWatchMonitoringService) flushShard(shard string, metric *cloudWat
} }
func (cw *CloudWatchMonitoringService) flush() error { func (cw *CloudWatchMonitoringService) flush() error {
log.Debugf("Flushing metrics data. Stream: %s, Worker: %s", cw.KinesisStream, cw.WorkerID) cw.Logger.Debugf("Flushing metrics data. Stream: %s, Worker: %s", cw.KinesisStream, cw.WorkerID)
// publish per shard metrics // publish per shard metrics
cw.shardMetrics.Range(func(k, v interface{}) bool { cw.shardMetrics.Range(func(k, v interface{}) bool {
shard, metric := k.(string), v.(*cloudWatchMetrics) shard, metric := k.(string), v.(*cloudWatchMetrics)

View file

@ -29,6 +29,7 @@ package metrics
import ( import (
"fmt" "fmt"
"github.com/vmware/vmware-go-kcl/logger"
) )
// MonitoringConfiguration allows you to configure how record processing metrics are exposed // MonitoringConfiguration allows you to configure how record processing metrics are exposed
@ -38,6 +39,7 @@ type MonitoringConfiguration struct {
Prometheus PrometheusMonitoringService Prometheus PrometheusMonitoringService
CloudWatch CloudWatchMonitoringService CloudWatch CloudWatchMonitoringService
service MonitoringService service MonitoringService
Logger logger.Logger
} }
type MonitoringService interface { type MonitoringService interface {
@ -60,18 +62,25 @@ func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID st
return nil return nil
} }
// Config with default logger if logger is not specified.
if m.Logger == nil {
m.Logger = logger.GetDefaultLogger()
}
switch m.MonitoringService { switch m.MonitoringService {
case "prometheus": case "prometheus":
m.Prometheus.Namespace = nameSpace m.Prometheus.Namespace = nameSpace
m.Prometheus.KinesisStream = streamName m.Prometheus.KinesisStream = streamName
m.Prometheus.WorkerID = workerID m.Prometheus.WorkerID = workerID
m.Prometheus.Region = m.Region m.Prometheus.Region = m.Region
m.Prometheus.Logger = m.Logger
m.service = &m.Prometheus m.service = &m.Prometheus
case "cloudwatch": case "cloudwatch":
m.CloudWatch.Namespace = nameSpace m.CloudWatch.Namespace = nameSpace
m.CloudWatch.KinesisStream = streamName m.CloudWatch.KinesisStream = streamName
m.CloudWatch.WorkerID = workerID m.CloudWatch.WorkerID = workerID
m.CloudWatch.Region = m.Region m.CloudWatch.Region = m.Region
m.CloudWatch.Logger = m.Logger
m.service = &m.CloudWatch m.service = &m.CloudWatch
default: default:
return fmt.Errorf("Invalid monitoring service type %s", m.MonitoringService) return fmt.Errorf("Invalid monitoring service type %s", m.MonitoringService)

View file

@ -32,7 +32,8 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/vmware/vmware-go-kcl/logger"
) )
// PrometheusMonitoringService to start Prometheus as metrics system. // PrometheusMonitoringService to start Prometheus as metrics system.
@ -41,10 +42,12 @@ import (
type PrometheusMonitoringService struct { type PrometheusMonitoringService struct {
ListenAddress string ListenAddress string
Namespace string Namespace string
KinesisStream string KinesisStream string
WorkerID string WorkerID string
Region string Region string
Logger logger.Logger
processedRecords *prometheus.CounterVec processedRecords *prometheus.CounterVec
processedBytes *prometheus.CounterVec processedBytes *prometheus.CounterVec
behindLatestMillis *prometheus.GaugeVec behindLatestMillis *prometheus.GaugeVec
@ -106,12 +109,12 @@ func (p *PrometheusMonitoringService) Init() error {
func (p *PrometheusMonitoringService) Start() error { func (p *PrometheusMonitoringService) Start() error {
http.Handle("/metrics", promhttp.Handler()) http.Handle("/metrics", promhttp.Handler())
go func() { go func() {
log.Infof("Starting Prometheus listener on %s", p.ListenAddress) p.Logger.Infof("Starting Prometheus listener on %s", p.ListenAddress)
err := http.ListenAndServe(p.ListenAddress, nil) err := http.ListenAndServe(p.ListenAddress, nil)
if err != nil { if err != nil {
log.Errorf("Error starting Prometheus metrics endpoint. %+v", err) p.Logger.Errorf("Error starting Prometheus metrics endpoint. %+v", err)
} }
log.Info("Stopped metrics server") p.Logger.Infof("Stopped metrics server")
}() }()
return nil return nil

View file

@ -32,8 +32,6 @@ import (
"sync" "sync"
"time" "time"
log "github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
@ -86,6 +84,8 @@ type ShardConsumer struct {
} }
func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, error) { func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, error) {
log := sc.kclConfig.Logger
// Get checkpoint of the shard from dynamoDB // Get checkpoint of the shard from dynamoDB
err := sc.checkpointer.FetchCheckpoint(shard) err := sc.checkpointer.FetchCheckpoint(shard)
if err != nil && err != chk.ErrSequenceIDNotFound { if err != nil && err != chk.ErrSequenceIDNotFound {
@ -128,6 +128,8 @@ func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, erro
func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
defer sc.releaseLease(shard) defer sc.releaseLease(shard)
log := sc.kclConfig.Logger
// If the shard is child shard, need to wait until the parent finished. // If the shard is child shard, need to wait until the parent finished.
if err := sc.waitOnParentShard(shard); err != nil { if err := sc.waitOnParentShard(shard); err != nil {
// If parent shard has been deleted by Kinesis system already, just ignore the error. // If parent shard has been deleted by Kinesis system already, just ignore the error.
@ -282,6 +284,7 @@ func (sc *ShardConsumer) waitOnParentShard(shard *par.ShardStatus) error {
// Cleanup the internal lease cache // Cleanup the internal lease cache
func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) { func (sc *ShardConsumer) releaseLease(shard *par.ShardStatus) {
log := sc.kclConfig.Logger
log.Infof("Release lease for shard %s", shard.ID) log.Infof("Release lease for shard %s", shard.ID)
shard.SetLeaseOwner("") shard.SetLeaseOwner("")

View file

@ -32,8 +32,6 @@ import (
"sync" "sync"
"time" "time"
log "github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
@ -105,19 +103,20 @@ func (w *Worker) WithCheckpointer(checker chk.Checkpointer) *Worker {
// Run starts consuming data from the stream, and pass it to the application record processors. // Run starts consuming data from the stream, and pass it to the application record processors.
func (w *Worker) Start() error { func (w *Worker) Start() error {
log := w.kclConfig.Logger
if err := w.initialize(); err != nil { if err := w.initialize(); err != nil {
log.Errorf("Failed to initialize Worker: %+v", err) log.Errorf("Failed to initialize Worker: %+v", err)
return err return err
} }
// Start monitoring service // Start monitoring service
log.Info("Starting monitoring service.") log.Infof("Starting monitoring service.")
if err := w.mService.Start(); err != nil { if err := w.mService.Start(); err != nil {
log.Errorf("Failed to start monitoring service: %+v", err) log.Errorf("Failed to start monitoring service: %+v", err)
return err return err
} }
log.Info("Starting worker event loop.") log.Infof("Starting worker event loop.")
// entering event loop // entering event loop
go w.eventLoop() go w.eventLoop()
return nil return nil
@ -125,7 +124,8 @@ func (w *Worker) Start() error {
// Shutdown signals worker to shutdown. Worker will try initiating shutdown of all record processors. // Shutdown signals worker to shutdown. Worker will try initiating shutdown of all record processors.
func (w *Worker) Shutdown() { func (w *Worker) Shutdown() {
log.Info("Worker shutdown in requested.") log := w.kclConfig.Logger
log.Infof("Worker shutdown in requested.")
if w.done { if w.done {
return return
@ -136,11 +136,12 @@ func (w *Worker) Shutdown() {
w.waitGroup.Wait() w.waitGroup.Wait()
w.mService.Shutdown() w.mService.Shutdown()
log.Info("Worker loop is complete. Exiting from worker.") log.Infof("Worker loop is complete. Exiting from worker.")
} }
// Publish to write some data into stream. This function is mainly used for testing purpose. // Publish to write some data into stream. This function is mainly used for testing purpose.
func (w *Worker) Publish(streamName, partitionKey string, data []byte) error { func (w *Worker) Publish(streamName, partitionKey string, data []byte) error {
log := w.kclConfig.Logger
_, err := w.kc.PutRecord(&kinesis.PutRecordInput{ _, err := w.kc.PutRecord(&kinesis.PutRecordInput{
Data: data, Data: data,
StreamName: aws.String(streamName), StreamName: aws.String(streamName),
@ -154,12 +155,13 @@ func (w *Worker) Publish(streamName, partitionKey string, data []byte) error {
// initialize // initialize
func (w *Worker) initialize() error { func (w *Worker) initialize() error {
log.Info("Worker initialization in progress...") log := w.kclConfig.Logger
log.Infof("Worker initialization in progress...")
// Create default Kinesis session // Create default Kinesis session
if w.kc == nil { if w.kc == nil {
// create session for Kinesis // create session for Kinesis
log.Info("Creating Kinesis session") log.Infof("Creating Kinesis session")
s, err := session.NewSession(&aws.Config{ s, err := session.NewSession(&aws.Config{
Region: aws.String(w.regionName), Region: aws.String(w.regionName),
@ -173,15 +175,15 @@ func (w *Worker) initialize() error {
} }
w.kc = kinesis.New(s) w.kc = kinesis.New(s)
} else { } else {
log.Info("Use custom Kinesis service.") log.Infof("Use custom Kinesis service.")
} }
// Create default dynamodb based checkpointer implementation // Create default dynamodb based checkpointer implementation
if w.checkpointer == nil { if w.checkpointer == nil {
log.Info("Creating DynamoDB based checkpointer") log.Infof("Creating DynamoDB based checkpointer")
w.checkpointer = chk.NewDynamoCheckpoint(w.kclConfig) w.checkpointer = chk.NewDynamoCheckpoint(w.kclConfig)
} else { } else {
log.Info("Use custom checkpointer implementation.") log.Infof("Use custom checkpointer implementation.")
} }
err := w.metricsConfig.Init(w.kclConfig.ApplicationName, w.streamName, w.workerID) err := w.metricsConfig.Init(w.kclConfig.ApplicationName, w.streamName, w.workerID)
@ -190,7 +192,7 @@ func (w *Worker) initialize() error {
} }
w.mService = w.metricsConfig.GetMonitoringService() w.mService = w.metricsConfig.GetMonitoringService()
log.Info("Initializing Checkpointer") log.Infof("Initializing Checkpointer")
if err := w.checkpointer.Init(); err != nil { if err := w.checkpointer.Init(); err != nil {
log.Errorf("Failed to start Checkpointer: %+v", err) log.Errorf("Failed to start Checkpointer: %+v", err)
return err return err
@ -203,7 +205,7 @@ func (w *Worker) initialize() error {
w.waitGroup = &sync.WaitGroup{} w.waitGroup = &sync.WaitGroup{}
log.Info("Initialization complete.") log.Infof("Initialization complete.")
return nil return nil
} }
@ -226,6 +228,8 @@ func (w *Worker) newShardConsumer(shard *par.ShardStatus) *ShardConsumer {
// eventLoop // eventLoop
func (w *Worker) eventLoop() { func (w *Worker) eventLoop() {
log := w.kclConfig.Logger
for { for {
err := w.syncShard() err := w.syncShard()
if err != nil { if err != nil {
@ -271,7 +275,7 @@ func (w *Worker) eventLoop() {
if err != nil { if err != nil {
// cannot get lease on the shard // cannot get lease on the shard
if err.Error() != chk.ErrLeaseNotAquired { if err.Error() != chk.ErrLeaseNotAquired {
log.Error(err) log.Errorf("Cannot get lease: %+v", err)
} }
continue continue
} }
@ -284,7 +288,9 @@ func (w *Worker) eventLoop() {
w.waitGroup.Add(1) w.waitGroup.Add(1)
go func() { go func() {
defer w.waitGroup.Done() defer w.waitGroup.Done()
sc.getRecords(shard) if err := sc.getRecords(shard); err != nil {
log.Errorf("Error in getRecords: %+v", err)
}
}() }()
// exit from for loop and not to grab more shard for now. // exit from for loop and not to grab more shard for now.
break break
@ -293,7 +299,7 @@ func (w *Worker) eventLoop() {
select { select {
case <-*w.stop: case <-*w.stop:
log.Info("Shutting down...") log.Infof("Shutting down...")
return return
case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond): case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond):
} }
@ -303,6 +309,7 @@ func (w *Worker) eventLoop() {
// List all ACTIVE shard and store them into shardStatus table // List all ACTIVE shard and store them into shardStatus table
// If shard has been removed, need to exclude it from cached shard status. // If shard has been removed, need to exclude it from cached shard status.
func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) error { func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) error {
log := w.kclConfig.Logger
// The default pagination limit is 100. // The default pagination limit is 100.
args := &kinesis.DescribeStreamInput{ args := &kinesis.DescribeStreamInput{
StreamName: aws.String(w.streamName), StreamName: aws.String(w.streamName),
@ -355,6 +362,7 @@ func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) err
// syncShard to sync the cached shard info with actual shard info from Kinesis // syncShard to sync the cached shard info with actual shard info from Kinesis
func (w *Worker) syncShard() error { func (w *Worker) syncShard() error {
log := w.kclConfig.Logger
shardInfo := make(map[string]bool) shardInfo := make(map[string]bool)
err := w.getShardIDs("", shardInfo) err := w.getShardIDs("", shardInfo)

7
go.mod
View file

@ -1,6 +1,7 @@
module github.com/vmware/vmware-go-kcl module github.com/vmware/vmware-go-kcl
require ( require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/aws/aws-sdk-go v1.19.38 github.com/aws/aws-sdk-go v1.19.38
github.com/google/uuid v1.1.1 github.com/google/uuid v1.1.1
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
@ -9,7 +10,13 @@ require (
github.com/prometheus/procfs v0.0.0-20190523193104-a7aeb8df3389 // indirect github.com/prometheus/procfs v0.0.0-20190523193104-a7aeb8df3389 // indirect
github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.3.0 github.com/stretchr/testify v1.3.0
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.2.0 // indirect
go.uber.org/zap v1.11.0
golang.org/x/net v0.0.0-20190522155817-f3200d17e092 // indirect golang.org/x/net v0.0.0-20190522155817-f3200d17e092 // indirect
golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4 // indirect golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4 // indirect
golang.org/x/text v0.3.2 // indirect golang.org/x/text v0.3.2 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
) )
go 1.13

13
go.sum
View file

@ -1,3 +1,5 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@ -32,6 +34,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0j
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@ -59,6 +62,12 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4=
go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.11.0 h1:gSmpCfs+R47a4yQPAI4xJ0IPDLTRGXskm6UelqNXpqE=
go.uber.org/zap v1.11.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -78,5 +87,9 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

115
logger/logger.go Normal file
View file

@ -0,0 +1,115 @@
/*
* Copyright (c) 2019 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/
// https://github.com/amitrai48/logger
package logger
import (
"github.com/sirupsen/logrus"
)
// Fields Type to pass when we want to call WithFields for structured logging
type Fields map[string]interface{}
const (
//Debug has verbose message
Debug = "debug"
//Info is default log level
Info = "info"
//Warn is for logging messages about possible issues
Warn = "warn"
//Error is for logging errors
Error = "error"
//Fatal is for logging fatal messages. The sytem shutsdown after logging the message.
Fatal = "fatal"
)
// Logger is the common interface for logging.
type Logger interface {
Debugf(format string, args ...interface{})
Infof(format string, args ...interface{})
Warnf(format string, args ...interface{})
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Panicf(format string, args ...interface{})
WithFields(keyValues Fields) Logger
}
// Configuration stores the config for the logger
// For some loggers there can only be one level across writers, for such the level of Console is picked by default
type Configuration struct {
EnableConsole bool
ConsoleJSONFormat bool
ConsoleLevel string
EnableFile bool
FileJSONFormat bool
FileLevel string
// Filename is the file to write logs to. Backup log files will be retained
// in the same directory. It uses <processname>-lumberjack.log in
// os.TempDir() if empty.
Filename string
// MaxSize is the maximum size in megabytes of the log file before it gets
// rotated. It defaults to 100 megabytes.
MaxSizeMB int
// MaxAge is the maximum number of days to retain old log files based on the
// timestamp encoded in their filename. Note that a day is defined as 24
// hours and may not exactly correspond to calendar days due to daylight
// savings, leap seconds, etc. The default is 7 days.
MaxAgeDays int
// MaxBackups is the maximum number of old log files to retain. The default
// is to retain all old log files (though MaxAge may still cause them to get
// deleted.)
MaxBackups int
// LocalTime determines if the time used for formatting the timestamps in
// backup files is the computer's local time. The default is to use UTC
// time.
LocalTime bool
}
// GetDefaultLogger creates a default logger.
func GetDefaultLogger() Logger {
return NewLogrusLogger(logrus.StandardLogger())
}
// normalizeConfig to enforce default value in configuration.
func normalizeConfig(config *Configuration) {
if config.MaxSizeMB <= 0 {
config.MaxSizeMB = 100
}
if config.MaxAgeDays <= 0 {
config.MaxAgeDays = 7
}
if config.MaxBackups < 0 {
config.MaxBackups = 0
}
}

84
logger/logger_test.go Normal file
View file

@ -0,0 +1,84 @@
/*
* Copyright (c) 2019 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/
package logger
import (
"github.com/stretchr/testify/assert"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"testing"
)
func TestZapLoggerWithConfig(t *testing.T) {
config := Configuration{
EnableConsole: true,
ConsoleLevel: Debug,
ConsoleJSONFormat: true,
EnableFile: false,
FileLevel: Info,
FileJSONFormat: true,
Filename: "log.log",
}
log := NewZapLoggerWithConfig(config)
contextLogger := log.WithFields(Fields{"key1": "value1"})
contextLogger.Debugf("Starting with zap")
contextLogger.Infof("Zap is awesome")
}
func TestZapLogger(t *testing.T) {
zapLogger, err := zap.NewProduction()
assert.Nil(t, err)
log := NewZapLogger(zapLogger.Sugar())
contextLogger := log.WithFields(Fields{"key1": "value1"})
contextLogger.Debugf("Starting with zap")
contextLogger.Infof("Zap is awesome")
}
func TestLogrusLoggerWithConfig(t *testing.T) {
config := Configuration{
EnableConsole: true,
ConsoleLevel: Debug,
ConsoleJSONFormat: false,
EnableFile: false,
FileLevel: Info,
FileJSONFormat: true,
}
log := NewLogrusLoggerWithConfig(config)
contextLogger := log.WithFields(Fields{"key1": "value1"})
contextLogger.Debugf("Starting with logrus")
contextLogger.Infof("Logrus is awesome")
}
func TestLogrusLogger(t *testing.T) {
// adapts to Logger interface
log := NewLogrusLogger(logrus.StandardLogger())
contextLogger := log.WithFields(Fields{"key1": "value1"})
contextLogger.Debugf("Starting with logrus")
contextLogger.Infof("Logrus is awesome")
}

170
logger/logrus.go Normal file
View file

@ -0,0 +1,170 @@
/*
* Copyright (c) 2019 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/
// https://github.com/amitrai48/logger
package logger
import (
"io"
"os"
"github.com/sirupsen/logrus"
lumberjack "gopkg.in/natefinch/lumberjack.v2"
)
type LogrusLogEntry struct {
entry *logrus.Entry
}
type LogrusLogger struct {
logger *logrus.Logger
}
// NewLogrusLogger adapts existing logrus logger to Logger interface.
// The call is responsible for configuring logrus logger appropriately.
func NewLogrusLogger(lLogger *logrus.Logger) Logger {
return &LogrusLogger{
logger: lLogger,
}
}
// NewLogrusLoggerWithConfig creates and configs Logger instance backed by
// logrus logger.
func NewLogrusLoggerWithConfig(config Configuration) Logger {
logLevel := config.ConsoleLevel
if logLevel == "" {
logLevel = config.FileLevel
}
level, err := logrus.ParseLevel(logLevel)
if err != nil {
// fallback to InfoLevel
level = logrus.InfoLevel
}
normalizeConfig(&config)
stdOutHandler := os.Stdout
fileHandler := &lumberjack.Logger{
Filename: config.Filename,
MaxSize: config.MaxSizeMB,
Compress: true,
MaxAge: config.MaxAgeDays,
MaxBackups: config.MaxBackups,
LocalTime: config.LocalTime,
}
lLogger := &logrus.Logger{
Out: stdOutHandler,
Formatter: getFormatter(config.ConsoleJSONFormat),
Hooks: make(logrus.LevelHooks),
Level: level,
}
if config.EnableConsole && config.EnableFile {
lLogger.SetOutput(io.MultiWriter(stdOutHandler, fileHandler))
} else {
if config.EnableFile {
lLogger.SetOutput(fileHandler)
lLogger.SetFormatter(getFormatter(config.FileJSONFormat))
}
}
return &LogrusLogger{
logger: lLogger,
}
}
func (l *LogrusLogger) Debugf(format string, args ...interface{}) {
l.logger.Debugf(format, args...)
}
func (l *LogrusLogger) Infof(format string, args ...interface{}) {
l.logger.Infof(format, args...)
}
func (l *LogrusLogger) Warnf(format string, args ...interface{}) {
l.logger.Warnf(format, args...)
}
func (l *LogrusLogger) Errorf(format string, args ...interface{}) {
l.logger.Errorf(format, args...)
}
func (l *LogrusLogger) Fatalf(format string, args ...interface{}) {
l.logger.Fatalf(format, args...)
}
func (l *LogrusLogger) Panicf(format string, args ...interface{}) {
l.logger.Fatalf(format, args...)
}
func (l *LogrusLogger) WithFields(fields Fields) Logger {
return &LogrusLogEntry{
entry: l.logger.WithFields(convertToLogrusFields(fields)),
}
}
func (l *LogrusLogEntry) Debugf(format string, args ...interface{}) {
l.entry.Debugf(format, args...)
}
func (l *LogrusLogEntry) Infof(format string, args ...interface{}) {
l.entry.Infof(format, args...)
}
func (l *LogrusLogEntry) Warnf(format string, args ...interface{}) {
l.entry.Warnf(format, args...)
}
func (l *LogrusLogEntry) Errorf(format string, args ...interface{}) {
l.entry.Errorf(format, args...)
}
func (l *LogrusLogEntry) Fatalf(format string, args ...interface{}) {
l.entry.Fatalf(format, args...)
}
func (l *LogrusLogEntry) Panicf(format string, args ...interface{}) {
l.entry.Fatalf(format, args...)
}
func (l *LogrusLogEntry) WithFields(fields Fields) Logger {
return &LogrusLogEntry{
entry: l.entry.WithFields(convertToLogrusFields(fields)),
}
}
func getFormatter(isJSON bool) logrus.Formatter {
if isJSON {
return &logrus.JSONFormatter{}
}
return &logrus.TextFormatter{
FullTimestamp: true,
DisableLevelTruncation: true,
}
}
func convertToLogrusFields(fields Fields) logrus.Fields {
logrusFields := logrus.Fields{}
for index, val := range fields {
logrusFields[index] = val
}
return logrusFields
}

151
logger/zap.go Normal file
View file

@ -0,0 +1,151 @@
/*
* Copyright (c) 2019 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/
// https://github.com/amitrai48/logger
package logger
import (
"os"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
lumberjack "gopkg.in/natefinch/lumberjack.v2"
)
type ZapLogger struct {
sugaredLogger *zap.SugaredLogger
}
// NewZapLogger adapts existing sugared zap logger to Logger interface.
// The call is responsible for configuring sugard zap logger appropriately.
//
// Note: Sugar wraps the Logger to provide a more ergonomic, but slightly slower,
// API. Sugaring a Logger is quite inexpensive, so it's reasonable for a
// single application to use both Loggers and SugaredLoggers, converting
// between them on the boundaries of performance-sensitive code.
//
// Base zap logger can be convert to SugaredLogger by calling to add a wrapper:
// sugaredLogger := log.Sugar()
//
func NewZapLogger(logger *zap.SugaredLogger) Logger {
return &ZapLogger{
sugaredLogger: logger,
}
}
// NewZapLoggerWithConfig creates and configs Logger instance backed by
// zap Sugared logger.
func NewZapLoggerWithConfig(config Configuration) Logger {
cores := []zapcore.Core{}
if config.EnableConsole {
level := getZapLevel(config.ConsoleLevel)
writer := zapcore.Lock(os.Stdout)
core := zapcore.NewCore(getEncoder(config.ConsoleJSONFormat), writer, level)
cores = append(cores, core)
}
if config.EnableFile {
level := getZapLevel(config.FileLevel)
writer := zapcore.AddSync(&lumberjack.Logger{
Filename: config.Filename,
MaxSize: config.MaxSizeMB,
Compress: true,
MaxAge: config.MaxAgeDays,
MaxBackups: config.MaxBackups,
LocalTime: config.LocalTime,
})
core := zapcore.NewCore(getEncoder(config.FileJSONFormat), writer, level)
cores = append(cores, core)
}
combinedCore := zapcore.NewTee(cores...)
// AddCallerSkip skips 2 number of callers, this is important else the file that gets
// logged will always be the wrapped file. In our case zap.go
logger := zap.New(combinedCore,
zap.AddCallerSkip(2),
zap.AddCaller(),
).Sugar()
return &ZapLogger{
sugaredLogger: logger,
}
}
func (l *ZapLogger) Debugf(format string, args ...interface{}) {
l.sugaredLogger.Debugf(format, args...)
}
func (l *ZapLogger) Infof(format string, args ...interface{}) {
l.sugaredLogger.Infof(format, args...)
}
func (l *ZapLogger) Warnf(format string, args ...interface{}) {
l.sugaredLogger.Warnf(format, args...)
}
func (l *ZapLogger) Errorf(format string, args ...interface{}) {
l.sugaredLogger.Errorf(format, args...)
}
func (l *ZapLogger) Fatalf(format string, args ...interface{}) {
l.sugaredLogger.Fatalf(format, args...)
}
func (l *ZapLogger) Panicf(format string, args ...interface{}) {
l.sugaredLogger.Fatalf(format, args...)
}
func (l *ZapLogger) WithFields(fields Fields) Logger {
var f = make([]interface{}, 0)
for k, v := range fields {
f = append(f, k)
f = append(f, v)
}
newLogger := l.sugaredLogger.With(f...)
return &ZapLogger{newLogger}
}
func getEncoder(isJSON bool) zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
if isJSON {
return zapcore.NewJSONEncoder(encoderConfig)
}
return zapcore.NewConsoleEncoder(encoderConfig)
}
func getZapLevel(level string) zapcore.Level {
switch level {
case Info:
return zapcore.InfoLevel
case Warn:
return zapcore.WarnLevel
case Debug:
return zapcore.DebugLevel
case Error:
return zapcore.ErrorLevel
case Fatal:
return zapcore.FatalLevel
default:
return zapcore.InfoLevel
}
}

87
test/logger_test.go Normal file
View file

@ -0,0 +1,87 @@
/*
* Copyright (c) 2019 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// Note: The implementation comes from https://www.mountedthoughts.com/golang-logger-interface/
package test
import (
"github.com/stretchr/testify/assert"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"testing"
"github.com/vmware/vmware-go-kcl/logger"
)
func TestZapLoggerWithConfig(t *testing.T) {
config := logger.Configuration{
EnableConsole: true,
ConsoleLevel: logger.Debug,
ConsoleJSONFormat: true,
EnableFile: true,
FileLevel: logger.Info,
FileJSONFormat: true,
Filename: "log.log",
}
log := logger.NewZapLoggerWithConfig(config)
contextLogger := log.WithFields(logger.Fields{"key1": "value1"})
contextLogger.Debugf("Starting with zap")
contextLogger.Infof("Zap is awesome")
}
func TestZapLogger(t *testing.T) {
zapLogger, err := zap.NewProduction()
assert.Nil(t, err)
log := logger.NewZapLogger(zapLogger.Sugar())
contextLogger := log.WithFields(logger.Fields{"key1": "value1"})
contextLogger.Debugf("Starting with zap")
contextLogger.Infof("Zap is awesome")
}
func TestLogrusLoggerWithConfig(t *testing.T) {
config := logger.Configuration{
EnableConsole: true,
ConsoleLevel: logger.Debug,
ConsoleJSONFormat: false,
EnableFile: true,
FileLevel: logger.Info,
FileJSONFormat: true,
Filename: "log.log",
}
log := logger.NewLogrusLoggerWithConfig(config)
contextLogger := log.WithFields(logger.Fields{"key1": "value1"})
contextLogger.Debugf("Starting with logrus")
contextLogger.Infof("Logrus is awesome")
}
func TestLogrusLogger(t *testing.T) {
// adapts to Logger interface
log := logger.NewLogrusLogger(logrus.StandardLogger())
contextLogger := log.WithFields(logger.Fields{"key1": "value1"})
contextLogger.Debugf("Starting with logrus")
contextLogger.Infof("Logrus is awesome")
}

View file

@ -19,9 +19,6 @@
package test package test
import ( import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
@ -30,15 +27,17 @@ import (
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config" cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config"
kc "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" kc "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
"github.com/vmware/vmware-go-kcl/clientlibrary/metrics" "github.com/vmware/vmware-go-kcl/clientlibrary/metrics"
"github.com/vmware/vmware-go-kcl/clientlibrary/utils" "github.com/vmware/vmware-go-kcl/clientlibrary/utils"
wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker" wk "github.com/vmware/vmware-go-kcl/clientlibrary/worker"
"github.com/vmware/vmware-go-kcl/logger"
) )
const ( const (
@ -53,6 +52,22 @@ const metricsSystem = "cloudwatch"
var shardID string var shardID string
func TestWorker(t *testing.T) { func TestWorker(t *testing.T) {
// At miminal. use standard logrus logger
// log := logger.NewLogrusLogger(logrus.StandardLogger())
//
// In order to have precise control over logging. Use logger with config
config := logger.Configuration{
EnableConsole: true,
ConsoleLevel: logger.Debug,
ConsoleJSONFormat: false,
EnableFile: true,
FileLevel: logger.Info,
FileJSONFormat: true,
Filename: "log.log",
}
// Use logrus logger
log := logger.NewLogrusLoggerWithConfig(config)
kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
WithInitialPositionInStream(cfg.LATEST). WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10). WithMaxRecords(10).
@ -60,12 +75,31 @@ func TestWorker(t *testing.T) {
WithShardSyncIntervalMillis(5000). WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000). WithFailoverTimeMillis(300000).
WithMetricsBufferTimeMillis(10000). WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20) WithMetricsMaxQueueSize(20).
WithLogger(log)
runTest(kclConfig, false, t) runTest(kclConfig, false, t)
} }
func TestWorkerWithSigInt(t *testing.T) { func TestWorkerWithSigInt(t *testing.T) {
// At miminal. use standard zap logger
//zapLogger, err := zap.NewProduction()
//assert.Nil(t, err)
//log := logger.NewZapLogger(zapLogger.Sugar())
//
// In order to have precise control over logging. Use logger with config.
config := logger.Configuration{
EnableConsole: true,
ConsoleLevel: logger.Debug,
ConsoleJSONFormat: true,
EnableFile: true,
FileLevel: logger.Info,
FileJSONFormat: true,
Filename: "log.log",
}
// use zap logger
log := logger.NewZapLoggerWithConfig(config)
kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
WithInitialPositionInStream(cfg.LATEST). WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10). WithMaxRecords(10).
@ -73,7 +107,8 @@ func TestWorkerWithSigInt(t *testing.T) {
WithShardSyncIntervalMillis(5000). WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000). WithFailoverTimeMillis(300000).
WithMetricsBufferTimeMillis(10000). WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20) WithMetricsMaxQueueSize(20).
WithLogger(log)
runTest(kclConfig, true, t) runTest(kclConfig, true, t)
} }
@ -120,9 +155,6 @@ func TestWorkerAssumeRole(t *testing.T) {
} }
func runTest(kclConfig *cfg.KinesisClientLibConfiguration, triggersig bool, t *testing.T) { func runTest(kclConfig *cfg.KinesisClientLibConfiguration, triggersig bool, t *testing.T) {
log.SetOutput(os.Stdout)
log.SetLevel(log.DebugLevel)
assert.Equal(t, regionName, kclConfig.RegionName) assert.Equal(t, regionName, kclConfig.RegionName)
assert.Equal(t, streamName, kclConfig.StreamName) assert.Equal(t, streamName, kclConfig.StreamName)
@ -192,6 +224,7 @@ func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service stri
return &metrics.MonitoringConfiguration{ return &metrics.MonitoringConfiguration{
MonitoringService: "cloudwatch", MonitoringService: "cloudwatch",
Region: regionName, Region: regionName,
Logger: kclConfig.Logger,
CloudWatch: metrics.CloudWatchMonitoringService{ CloudWatch: metrics.CloudWatchMonitoringService{
Credentials: kclConfig.CloudWatchCredentials, Credentials: kclConfig.CloudWatchCredentials,
// Those value should come from kclConfig // Those value should come from kclConfig
@ -205,6 +238,7 @@ func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service stri
return &metrics.MonitoringConfiguration{ return &metrics.MonitoringConfiguration{
MonitoringService: "prometheus", MonitoringService: "prometheus",
Region: regionName, Region: regionName,
Logger: kclConfig.Logger,
Prometheus: metrics.PrometheusMonitoringService{ Prometheus: metrics.PrometheusMonitoringService{
ListenAddress: ":8080", ListenAddress: ":8080",
}, },