Remove signal handling from event loop (#20)

Take signle handling out of event loop. Also, make the worker
Shutdown idempotent and update tests.

Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
Tao Jiang 2019-05-20 08:57:32 -05:00
parent 2ca82c25ca
commit 6df520b343
2 changed files with 48 additions and 17 deletions

View file

@ -29,10 +29,7 @@ package worker
import ( import (
"errors" "errors"
"os"
"os/signal"
"sync" "sync"
"syscall"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -67,7 +64,7 @@ type Worker struct {
stop *chan struct{} stop *chan struct{}
waitGroup *sync.WaitGroup waitGroup *sync.WaitGroup
sigs *chan os.Signal done bool
shardStatus map[string]*par.ShardStatus shardStatus map[string]*par.ShardStatus
@ -84,6 +81,7 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli
processorFactory: factory, processorFactory: factory,
kclConfig: kclConfig, kclConfig: kclConfig,
metricsConfig: metricsConfig, metricsConfig: metricsConfig,
done: false,
} }
// create session for Kinesis // create session for Kinesis
@ -147,7 +145,12 @@ func (w *Worker) Start() error {
func (w *Worker) Shutdown() { func (w *Worker) Shutdown() {
log.Info("Worker shutdown in requested.") log.Info("Worker shutdown in requested.")
if w.done {
return
}
close(*w.stop) close(*w.stop)
w.done = true
w.waitGroup.Wait() w.waitGroup.Wait()
w.mService.Shutdown() w.mService.Shutdown()
@ -185,10 +188,6 @@ func (w *Worker) initialize() error {
w.shardStatus = make(map[string]*par.ShardStatus) w.shardStatus = make(map[string]*par.ShardStatus)
sigs := make(chan os.Signal, 1)
w.sigs = &sigs
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
stopChan := make(chan struct{}) stopChan := make(chan struct{})
w.stop = &stopChan w.stop = &stopChan
@ -282,12 +281,8 @@ func (w *Worker) eventLoop() {
} }
select { select {
case sig := <-*w.sigs:
log.Infof("Received signal %s. Exiting", sig)
w.Shutdown()
return
case <-*w.stop: case <-*w.stop:
log.Info("Shutting down") log.Info("Shutting down...")
return return
case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond): case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond):
} }

View file

@ -24,6 +24,8 @@ import (
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"net/http" "net/http"
"os" "os"
"os/signal"
"syscall"
"testing" "testing"
"time" "time"
@ -57,7 +59,20 @@ func TestWorker(t *testing.T) {
WithMetricsBufferTimeMillis(10000). WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20) WithMetricsMaxQueueSize(20)
runTest(kclConfig, t) runTest(kclConfig, false, t)
}
func TestWorkerWithSigInt(t *testing.T) {
kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(10).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000).
WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20)
runTest(kclConfig, true, t)
} }
func TestWorkerStatic(t *testing.T) { func TestWorkerStatic(t *testing.T) {
@ -74,7 +89,7 @@ func TestWorkerStatic(t *testing.T) {
WithMetricsBufferTimeMillis(10000). WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20) WithMetricsMaxQueueSize(20)
runTest(kclConfig, t) runTest(kclConfig, false, t)
} }
func TestWorkerAssumeRole(t *testing.T) { func TestWorkerAssumeRole(t *testing.T) {
@ -98,10 +113,10 @@ func TestWorkerAssumeRole(t *testing.T) {
WithMetricsBufferTimeMillis(10000). WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20) WithMetricsMaxQueueSize(20)
runTest(kclConfig, t) runTest(kclConfig, false, t)
} }
func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) { func runTest(kclConfig *cfg.KinesisClientLibConfiguration, triggersig bool, t *testing.T) {
log.SetOutput(os.Stdout) log.SetOutput(os.Stdout)
log.SetLevel(log.DebugLevel) log.SetLevel(log.DebugLevel)
@ -118,7 +133,20 @@ func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) {
err := worker.Start() err := worker.Start()
assert.Nil(t, err) assert.Nil(t, err)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
// Signal processing.
go func() {
sig := <-sigs
t.Logf("Received signal %s. Exiting", sig)
worker.Shutdown()
// some other processing before exit.
//os.Exit(0)
}()
// Put some data into stream. // Put some data into stream.
t.Log("Putting data into stream.")
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
// Use random string as partition key to ensure even distribution across shards // Use random string as partition key to ensure even distribution across shards
err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr)) err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr))
@ -126,6 +154,13 @@ func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) {
t.Errorf("Errorin Publish. %+v", err) t.Errorf("Errorin Publish. %+v", err)
} }
} }
t.Log("Done putting data into stream.")
if triggersig {
t.Log("Trigger signal SIGINT")
p, _ := os.FindProcess(os.Getpid())
p.Signal(os.Interrupt)
}
// wait a few seconds before shutdown processing // wait a few seconds before shutdown processing
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
@ -146,6 +181,7 @@ func runTest(kclConfig *cfg.KinesisClientLibConfiguration, t *testing.T) {
} }
t.Log("Calling normal shutdown at the end of application.")
worker.Shutdown() worker.Shutdown()
} }