From 43a936cab3fd3bf9c9ad7ec0ea9ed41de15c792f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= <476650+arl@users.noreply.github.com> Date: Fri, 29 Nov 2019 19:59:35 +0100 Subject: [PATCH] Issue 61/add shard sync jitter (#62) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add a random number generator to Worker Signed-off-by: Aurélien Rainone * Add random jitter to the worker shard sync sleep Signed-off-by: Aurélien Rainone * Add random jitter in case syncShard fails Fixes #61 Signed-off-by: Aurélien Rainone --- clientlibrary/worker/worker.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index f14eec0..a9e461d 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -29,6 +29,7 @@ package worker import ( "errors" + "math/rand" "sync" "time" @@ -64,6 +65,8 @@ type Worker struct { waitGroup *sync.WaitGroup done bool + rng *rand.Rand + shardStatus map[string]*par.ShardStatus } @@ -75,6 +78,9 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli mService = metrics.NoopMonitoringService{} } + // Create a pseudo-random number generator and seed it. + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + return &Worker{ streamName: kclConfig.StreamName, regionName: kclConfig.RegionName, @@ -83,6 +89,7 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli kclConfig: kclConfig, mService: mService, done: false, + rng: rng, } } @@ -235,7 +242,10 @@ func (w *Worker) eventLoop() { err := w.syncShard() if err != nil { log.Errorf("Error getting Kinesis shards: %+v", err) - time.Sleep(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond) + + // Add [-50%, +50%] random jitter to ShardSyncIntervalMillis in case of error. + shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis)) + time.Sleep(time.Duration(shardSyncSleep) * time.Millisecond) continue } @@ -302,7 +312,13 @@ func (w *Worker) eventLoop() { case <-*w.stop: log.Infof("Shutting down...") return - case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond): + default: + // Add [-50%, +50%] random jitter to ShardSyncIntervalMillis. When multiple workers + // starts at the same time, this decreases the probability of them calling + // kinesis.DescribeStream at the same time, and hit the hard-limit on aws API calls. + // On average the period remains the same so that doesn't affect behavior. + shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis)) + time.Sleep(time.Duration(shardSyncSleep) * time.Millisecond) } } }