diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 139c27d..87b24aa 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -28,11 +28,12 @@ package worker import ( - log "github.com/sirupsen/logrus" "math" "sync" "time" + log "github.com/sirupsen/logrus" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/kinesis" @@ -79,7 +80,6 @@ type ShardConsumer struct { recordProcessor kcl.IRecordProcessor kclConfig *config.KinesisClientLibConfiguration stop *chan struct{} - waitGroup *sync.WaitGroup consumerID string mService metrics.MonitoringService state ShardConsumerState @@ -126,7 +126,6 @@ func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, erro // getRecords continously poll one shard for data record // Precondition: it currently has the lease on the shard. func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { - defer sc.waitGroup.Done() defer sc.releaseLease(shard) // If the shard is child shard, need to wait until the parent finished. diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 5bed3fa..2a7bd52 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -201,8 +201,7 @@ func (w *Worker) initialize() error { stopChan := make(chan struct{}) w.stop = &stopChan - wg := sync.WaitGroup{} - w.waitGroup = &wg + w.waitGroup = &sync.WaitGroup{} log.Info("Initialization complete.") @@ -220,7 +219,6 @@ func (w *Worker) newShardConsumer(shard *par.ShardStatus) *ShardConsumer { kclConfig: w.kclConfig, consumerID: w.workerID, stop: w.stop, - waitGroup: w.waitGroup, mService: w.mService, state: WAITING_ON_PARENT_SHARDS, } @@ -283,8 +281,11 @@ func (w *Worker) eventLoop() { log.Infof("Start Shard Consumer for shard: %v", shard.ID) sc := w.newShardConsumer(shard) - go sc.getRecords(shard) w.waitGroup.Add(1) + go func() { + defer w.waitGroup.Done() + sc.getRecords(shard) + }() // exit from for loop and not to grab more shard for now. break }