refactor(Worker): shutdown when shardInfoShardConsumerMap is empty
This commit is contained in:
parent
6fd2276556
commit
14fb8e2703
1 changed files with 10 additions and 2 deletions
|
|
@ -482,9 +482,10 @@ public class Worker implements Runnable {
|
||||||
shutdown();
|
shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!shouldShutdown()) {
|
do {
|
||||||
|
// use do while to initialize the shardInfoShardConsumerMap, which is checked in shouldShutdown()
|
||||||
runProcessLoop();
|
runProcessLoop();
|
||||||
}
|
} while (!shouldShutdown());
|
||||||
|
|
||||||
finalShutdown();
|
finalShutdown();
|
||||||
LOG.info("Worker loop is complete. Exiting from worker.");
|
LOG.info("Worker loop is complete. Exiting from worker.");
|
||||||
|
|
@ -526,6 +527,9 @@ public class Worker implements Runnable {
|
||||||
LOG.info("Worker: sleep interrupted after catching exception ", ex);
|
LOG.info("Worker: sleep interrupted after catching exception ", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reset retries
|
||||||
|
retries.set(0);
|
||||||
wlog.resetInfoLogging();
|
wlog.resetInfoLogging();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -868,6 +872,10 @@ public class Worker implements Runnable {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (shardInfoShardConsumerMap.isEmpty()) {
|
||||||
|
LOG.info("Nothing to consume");
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue