From df60778d89c4e3a4dfa21a31adf1c0f8e04acd44 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Fri, 29 Nov 2019 14:27:05 -0600 Subject: [PATCH] Re-org code for adding jittered delay for syncShard (#63) Minor update for the previous commit by removing duplicated code. No functional change. Signed-off-by: Tao Jiang --- clientlibrary/worker/worker.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index a9e461d..13eb8d9 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -239,12 +239,15 @@ func (w *Worker) eventLoop() { log := w.kclConfig.Logger for { + // Add [-50%, +50%] random jitter to ShardSyncIntervalMillis. When multiple workers + // starts at the same time, this decreases the probability of them calling + // kinesis.DescribeStream at the same time, and hit the hard-limit on aws API calls. + // On average the period remains the same so that doesn't affect behavior. + shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis)) + err := w.syncShard() if err != nil { - log.Errorf("Error getting Kinesis shards: %+v", err) - - // Add [-50%, +50%] random jitter to ShardSyncIntervalMillis in case of error. - shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis)) + log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep) time.Sleep(time.Duration(shardSyncSleep) * time.Millisecond) continue } @@ -313,11 +316,7 @@ func (w *Worker) eventLoop() { log.Infof("Shutting down...") return default: - // Add [-50%, +50%] random jitter to ShardSyncIntervalMillis. When multiple workers - // starts at the same time, this decreases the probability of them calling - // kinesis.DescribeStream at the same time, and hit the hard-limit on aws API calls. - // On average the period remains the same so that doesn't affect behavior. - shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis)) + log.Debugf("Trying to sync shards in %d ms...", shardSyncSleep) time.Sleep(time.Duration(shardSyncSleep) * time.Millisecond) } }