Let healthchecks happen after initialization is complete

This commit is contained in:
Aravinda Kidambi Srinivasan 2024-04-29 21:32:26 -07:00
parent 651bf4d28c
commit bde5ae9dac

View file

@ -174,8 +174,7 @@ public class ShardConsumer {
if (isShutdownRequested()) { if (isShutdownRequested()) {
stateChangeFuture = shutdownComplete(); stateChangeFuture = shutdownComplete();
} else if (needsInitialization) { } else if (needsInitialization) {
if (stateChangeFuture != null) { if (stateChangeFuture != null && stateChangeFuture.get()) {
if (stateChangeFuture.get()) {
// Task rejection during the subscribe() call will not be propagated back as it not executed // Task rejection during the subscribe() call will not be propagated back as it not executed
// in the context of the Scheduler thread. Hence we should not assume the subscription will // in the context of the Scheduler thread. Hence we should not assume the subscription will
// always be successful. // always be successful.
@ -185,16 +184,15 @@ public class ShardConsumer {
// is complete // is complete
subscribe(); subscribe();
needsInitialization = false; needsInitialization = false;
// Initialization is complete, return now, because we dont need to do // Initialization is complete, we don't need to do initializeComplete anymore.
// initializeComplete anymore. ShardConsumer is in ProcessingState now and any further activity // ShardConsumer is already in ProcessingState and any further activity
// will be driven by publisher pushing data to subscriber which invokes handleInput // will be driven by publisher pushing data to subscriber which invokes handleInput
// and that triggers ProcessTask. Scheduler is only meant to do health-checks // and that triggers ProcessTask. Scheduler is only meant to do health-checks
// to ensure the consumer is not stuck for any reason and to do shutdown handling. // to ensure the consumer is not stuck for any reason and to do shutdown handling.
return; } else {
}
}
stateChangeFuture = initializeComplete(); stateChangeFuture = initializeComplete();
} }
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
// //
// Ignored should be handled by scheduler // Ignored should be handled by scheduler