Fix possible deadlock with getRecords in eventLoop (#42)

A waitgroup should always be incremented before the creation of the
goroutine which decrements it (through Done) or there is the
potential for deadlock.
That was not the case since the wg.Add was performed after the
`go getRecords() ` line.

Also, since there's only one path leading to the wg.Done in getRecords,
I moved wg.Done out of the getRecords function and placed it
alongside the goroutine creation, thus totally removing the need to
pass the waitgroup pointer to the sc instance, this lead to the
removal of the `waitGroup` field from the `ShardConsumer` struct.

This has been tested in production and didn't create any problem.

Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
This commit is contained in:
Aurélien Rainone 2019-10-27 16:43:21 +01:00 committed by Tao Jiang
parent 4f79203f44
commit c8a5aa1891
2 changed files with 7 additions and 7 deletions

View file

@ -28,11 +28,12 @@
package worker package worker
import ( import (
log "github.com/sirupsen/logrus"
"math" "math"
"sync" "sync"
"time" "time"
log "github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
@ -79,7 +80,6 @@ type ShardConsumer struct {
recordProcessor kcl.IRecordProcessor recordProcessor kcl.IRecordProcessor
kclConfig *config.KinesisClientLibConfiguration kclConfig *config.KinesisClientLibConfiguration
stop *chan struct{} stop *chan struct{}
waitGroup *sync.WaitGroup
consumerID string consumerID string
mService metrics.MonitoringService mService metrics.MonitoringService
state ShardConsumerState state ShardConsumerState
@ -126,7 +126,6 @@ func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, erro
// getRecords continously poll one shard for data record // getRecords continously poll one shard for data record
// Precondition: it currently has the lease on the shard. // Precondition: it currently has the lease on the shard.
func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
defer sc.waitGroup.Done()
defer sc.releaseLease(shard) defer sc.releaseLease(shard)
// If the shard is child shard, need to wait until the parent finished. // If the shard is child shard, need to wait until the parent finished.

View file

@ -201,8 +201,7 @@ func (w *Worker) initialize() error {
stopChan := make(chan struct{}) stopChan := make(chan struct{})
w.stop = &stopChan w.stop = &stopChan
wg := sync.WaitGroup{} w.waitGroup = &sync.WaitGroup{}
w.waitGroup = &wg
log.Info("Initialization complete.") log.Info("Initialization complete.")
@ -220,7 +219,6 @@ func (w *Worker) newShardConsumer(shard *par.ShardStatus) *ShardConsumer {
kclConfig: w.kclConfig, kclConfig: w.kclConfig,
consumerID: w.workerID, consumerID: w.workerID,
stop: w.stop, stop: w.stop,
waitGroup: w.waitGroup,
mService: w.mService, mService: w.mService,
state: WAITING_ON_PARENT_SHARDS, state: WAITING_ON_PARENT_SHARDS,
} }
@ -283,8 +281,11 @@ func (w *Worker) eventLoop() {
log.Infof("Start Shard Consumer for shard: %v", shard.ID) log.Infof("Start Shard Consumer for shard: %v", shard.ID)
sc := w.newShardConsumer(shard) sc := w.newShardConsumer(shard)
go sc.getRecords(shard)
w.waitGroup.Add(1) w.waitGroup.Add(1)
go func() {
defer w.waitGroup.Done()
sc.getRecords(shard)
}()
// exit from for loop and not to grab more shard for now. // exit from for loop and not to grab more shard for now.
break break
} }