From 6df520b34395e522c8f58c481384af2583a4f437 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Mon, 20 May 2019 08:57:32 -0500 Subject: [PATCH] Remove signal handling from event loop (#20) Take signle handling out of event loop. Also, make the worker Shutdown idempotent and update tests. Signed-off-by: Tao Jiang --- clientlibrary/worker/worker.go | 21 ++++++-------- clientlibrary/worker/worker_test.go | 44 ++++++++++++++++++++++++++--- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 8c3bedd..33d5a6d 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -29,10 +29,7 @@ package worker import ( "errors" - "os" - "os/signal" "sync" - "syscall" "time" log "github.com/sirupsen/logrus" @@ -67,7 +64,7 @@ type Worker struct { stop *chan struct{} waitGroup *sync.WaitGroup - sigs *chan os.Signal + done bool shardStatus map[string]*par.ShardStatus @@ -84,6 +81,7 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli processorFactory: factory, kclConfig: kclConfig, metricsConfig: metricsConfig, + done: false, } // create session for Kinesis @@ -147,7 +145,12 @@ func (w *Worker) Start() error { func (w *Worker) Shutdown() { log.Info("Worker shutdown in requested.") + if w.done { + return + } + close(*w.stop) + w.done = true w.waitGroup.Wait() w.mService.Shutdown() @@ -185,10 +188,6 @@ func (w *Worker) initialize() error { w.shardStatus = make(map[string]*par.ShardStatus) - sigs := make(chan os.Signal, 1) - w.sigs = &sigs - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - stopChan := make(chan struct{}) w.stop = &stopChan @@ -282,12 +281,8 @@ func (w *Worker) eventLoop() { } select { - case sig := <-*w.sigs: - log.Infof("Received signal %s. Exiting", sig) - w.Shutdown() - return case <-*w.stop: - log.Info("Shutting down") + log.Info("Shutting down...") return case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond): } diff --git a/clientlibrary/worker/worker_test.go b/clientlibrary/worker/worker_test.go index 013561f..57405dd 100644 --- a/clientlibrary/worker/worker_test.go +++ b/clientlibrary/worker/worker_test.go @@ -24,6 +24,8 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "net/http" "os" + "os/signal" + "syscall" "testing" "time" @@ -57,7 +59,20 @@ func TestWorker(t *testing.T) { WithMetricsBufferTimeMillis(10000). WithMetricsMaxQueueSize(20) - runTest(kclConfig, t) + runTest(kclConfig, false, t) +} + +func TestWorkerWithSigInt(t *testing.T) { + kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). + WithInitialPositionInStream(cfg.LATEST). + WithMaxRecords(10). + WithMaxLeasesForWorker(1). + WithShardSyncIntervalMillis(5000). + WithFailoverTimeMillis(300000). + WithMetricsBufferTimeMillis(10000). + WithMetricsMaxQueueSize(20) + + runTest(kclConfig, true, t) } func TestWorkerStatic(t *testing.T) { @@ -74,7 +89,7 @@ func TestWorkerStatic(t *testing.T) { WithMetricsBufferTimeMillis(10000). WithMetricsMaxQueueSize(20) - runTest(kclConfig, t) + runTest(kclConfig, false, t) } func TestWorkerAssumeRole(t *testing.T) { @@ -98,10 +113,10 @@ func TestWorkerAssumeRole(t *testing.T) { WithMetricsBufferTimeMillis(10000). WithMetricsMaxQueueSize(20) - runTest(kclConfig, t) + runTest(kclConfig, false, t) } -func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) { +func runTest(kclConfig *cfg.KinesisClientLibConfiguration, triggersig bool, t *testing.T) { log.SetOutput(os.Stdout) log.SetLevel(log.DebugLevel) @@ -118,7 +133,20 @@ func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) { err := worker.Start() assert.Nil(t, err) + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + // Signal processing. + go func() { + sig := <-sigs + t.Logf("Received signal %s. Exiting", sig) + worker.Shutdown() + // some other processing before exit. + //os.Exit(0) + }() + // Put some data into stream. + t.Log("Putting data into stream.") for i := 0; i < 100; i++ { // Use random string as partition key to ensure even distribution across shards err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr)) @@ -126,6 +154,13 @@ func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) { t.Errorf("Errorin Publish. %+v", err) } } + t.Log("Done putting data into stream.") + + if triggersig { + t.Log("Trigger signal SIGINT") + p, _ := os.FindProcess(os.Getpid()) + p.Signal(os.Interrupt) + } // wait a few seconds before shutdown processing time.Sleep(10 * time.Second) @@ -146,6 +181,7 @@ func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) { } + t.Log("Calling normal shutdown at the end of application.") worker.Shutdown() }