Re-org code for adding jittered delay for syncShard (#63)

Minor update for the previous commit by removing duplicated code.
No functional change.

Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
Tao Jiang 2019-11-29 14:27:05 -06:00
parent 43a936cab3
commit df60778d89

View file

@ -239,12 +239,15 @@ func (w *Worker) eventLoop() {
log := w.kclConfig.Logger
for {
// Add [-50%, +50%] random jitter to ShardSyncIntervalMillis. When multiple workers
// starts at the same time, this decreases the probability of them calling
// kinesis.DescribeStream at the same time, and hit the hard-limit on aws API calls.
// On average the period remains the same so that doesn't affect behavior.
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis))
err := w.syncShard()
if err != nil {
log.Errorf("Error getting Kinesis shards: %+v", err)
// Add [-50%, +50%] random jitter to ShardSyncIntervalMillis in case of error.
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis))
log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep)
time.Sleep(time.Duration(shardSyncSleep) * time.Millisecond)
continue
}
@ -313,11 +316,7 @@ func (w *Worker) eventLoop() {
log.Infof("Shutting down...")
return
default:
// Add [-50%, +50%] random jitter to ShardSyncIntervalMillis. When multiple workers
// starts at the same time, this decreases the probability of them calling
// kinesis.DescribeStream at the same time, and hit the hard-limit on aws API calls.
// On average the period remains the same so that doesn't affect behavior.
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + w.rng.Intn(int(w.kclConfig.ShardSyncIntervalMillis))
log.Debugf("Trying to sync shards in %d ms...", shardSyncSleep)
time.Sleep(time.Duration(shardSyncSleep) * time.Millisecond)
}
}