From c8a5aa1891bf239b232b7bf0a58ba0112fbaec97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= <476650+arl@users.noreply.github.com> Date: Sun, 27 Oct 2019 16:43:21 +0100 Subject: [PATCH] Fix possible deadlock with getRecords in eventLoop (#42) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A waitgroup should always be incremented before the creation of the goroutine which decrements it (through Done) or there is the potential for deadlock. That was not the case since the wg.Add was performed after the `go getRecords() ` line. Also, since there's only one path leading to the wg.Done in getRecords, I moved wg.Done out of the getRecords function and placed it alongside the goroutine creation, thus totally removing the need to pass the waitgroup pointer to the sc instance, this lead to the removal of the `waitGroup` field from the `ShardConsumer` struct. This has been tested in production and didn't create any problem. Signed-off-by: Aurélien Rainone --- clientlibrary/worker/shard-consumer.go | 5 ++--- clientlibrary/worker/worker.go | 9 +++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 139c27d..87b24aa 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -28,11 +28,12 @@ package worker import ( - log "github.com/sirupsen/logrus" "math" "sync" "time" + log "github.com/sirupsen/logrus" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/kinesis" @@ -79,7 +80,6 @@ type ShardConsumer struct { recordProcessor kcl.IRecordProcessor kclConfig *config.KinesisClientLibConfiguration stop *chan struct{} - waitGroup *sync.WaitGroup consumerID string mService metrics.MonitoringService state ShardConsumerState @@ -126,7 +126,6 @@ func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, erro // getRecords continously poll one shard for data record // Precondition: it currently has the lease on the shard. func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { - defer sc.waitGroup.Done() defer sc.releaseLease(shard) // If the shard is child shard, need to wait until the parent finished. diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 5bed3fa..2a7bd52 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -201,8 +201,7 @@ func (w *Worker) initialize() error { stopChan := make(chan struct{}) w.stop = &stopChan - wg := sync.WaitGroup{} - w.waitGroup = &wg + w.waitGroup = &sync.WaitGroup{} log.Info("Initialization complete.") @@ -220,7 +219,6 @@ func (w *Worker) newShardConsumer(shard *par.ShardStatus) *ShardConsumer { kclConfig: w.kclConfig, consumerID: w.workerID, stop: w.stop, - waitGroup: w.waitGroup, mService: w.mService, state: WAITING_ON_PARENT_SHARDS, } @@ -283,8 +281,11 @@ func (w *Worker) eventLoop() { log.Infof("Start Shard Consumer for shard: %v", shard.ID) sc := w.newShardConsumer(shard) - go sc.getRecords(shard) w.waitGroup.Add(1) + go func() { + defer w.waitGroup.Done() + sc.getRecords(shard) + }() // exit from for loop and not to grab more shard for now. break }