refactor(Worker): add retry logic on runProcessLoop. Throws exception after MAX_RETRIES

This commit is contained in:
glarwood 2018-12-11 19:46:40 -08:00
parent a116817710
commit 7417524649

View file

@ -66,6 +66,7 @@ public class Worker implements Runnable {
private static final Log LOG = LogFactory.getLog(Worker.class); private static final Log LOG = LogFactory.getLog(Worker.class);
private static final int MAX_INITIALIZATION_ATTEMPTS = 20; private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final int MAX_RETRIES = 4;
private WorkerLog wlog = new WorkerLog(); private WorkerLog wlog = new WorkerLog();
@ -370,38 +371,42 @@ public class Worker implements Runnable {
@VisibleForTesting @VisibleForTesting
void runProcessLoop() { void runProcessLoop() {
try { for (int i = 0; ; i++) {
boolean foundCompletedShard = false;
Set<ShardInfo> assignedShards = new HashSet<>();
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory);
if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) {
foundCompletedShard = true;
} else {
shardConsumer.consumeShard();
}
assignedShards.add(shardInfo);
}
if (foundCompletedShard) {
controlServer.syncShardAndLeaseInfo(null);
}
// clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShards);
wlog.info("Sleeping ...");
Thread.sleep(idleTimeInMilliseconds);
} catch (Exception e) {
LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!",
String.valueOf(idleTimeInMilliseconds)), e);
try { try {
boolean foundCompletedShard = false;
Set<ShardInfo> assignedShards = new HashSet<>();
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory);
if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) {
foundCompletedShard = true;
} else {
shardConsumer.consumeShard();
}
assignedShards.add(shardInfo);
}
if (foundCompletedShard) {
controlServer.syncShardAndLeaseInfo(null);
}
// clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShards);
wlog.info("Sleeping ...");
Thread.sleep(idleTimeInMilliseconds); Thread.sleep(idleTimeInMilliseconds);
} catch (InterruptedException ex) { } catch (Exception e) {
LOG.info("Worker: sleep interrupted after catching exception ", ex); if (i > MAX_RETRIES) throw new RuntimeException("Giving up after " + i + " retries", e);
LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!",
String.valueOf(idleTimeInMilliseconds)), e);
try {
Thread.sleep(idleTimeInMilliseconds);
} catch (InterruptedException ex) {
LOG.info("Worker: sleep interrupted after catching exception ", ex);
}
} }
wlog.resetInfoLogging();
} }
wlog.resetInfoLogging();
} }
private void initialize() { private void initialize() {