Issue 61/add shard sync jitter (#62)

* Add a random number generator to Worker

Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>

* Add random jitter to the worker shard sync sleep

Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>

* Add random jitter in case syncShard fails

Fixes #61

Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
This commit is contained in:
Aurélien Rainone 2019-11-29 19:59:35 +01:00 committed by Tao Jiang
parent a35f4960a8
commit 43a936cab3

View file

@ -29,6 +29,7 @@ package worker
import ( import (
"errors" "errors"
"math/rand"
"sync" "sync"
"time" "time"
@ -64,6 +65,8 @@ type Worker struct {
waitGroup *sync.WaitGroup waitGroup *sync.WaitGroup
done bool done bool
rng *rand.Rand
shardStatus map[string]*par.ShardStatus shardStatus map[string]*par.ShardStatus
} }
@ -75,6 +78,9 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli
mService = metrics.NoopMonitoringService{} mService = metrics.NoopMonitoringService{}
} }
// Create a pseudo-random number generator and seed it.
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
return &Worker{ return &Worker{
streamName: kclConfig.StreamName, streamName: kclConfig.StreamName,
regionName: kclConfig.RegionName, regionName: kclConfig.RegionName,
@ -83,6 +89,7 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli
kclConfig: kclConfig, kclConfig: kclConfig,
mService: mService, mService: mService,
done: false, done: false,
rng: rng,
} }
} }
@ -235,7 +242,10 @@ func (w *Worker) eventLoop() {
err := w.syncShard() err := w.syncShard()
if err != nil { if err != nil {
log.Errorf("Error getting Kinesis shards: %+v", err) log.Errorf("Error getting Kinesis shards: %+v", err)
time.Sleep(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond)
// Add [-50%, +50%] random jitter to ShardSyncIntervalMillis in case of error.
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis))
time.Sleep(time.Duration(shardSyncSleep) * time.Millisecond)
continue continue
} }
@ -302,7 +312,13 @@ func (w *Worker) eventLoop() {
case <-*w.stop: case <-*w.stop:
log.Infof("Shutting down...") log.Infof("Shutting down...")
return return
case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond): default:
// Add [-50%, +50%] random jitter to ShardSyncIntervalMillis. When multiple workers
// starts at the same time, this decreases the probability of them calling
// kinesis.DescribeStream at the same time, and hit the hard-limit on aws API calls.
// On average the period remains the same so that doesn't affect behavior.
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis))
time.Sleep(time.Duration(shardSyncSleep) * time.Millisecond)
} }
} }
} }