diff --git a/src/babashka/pods/impl.clj b/src/babashka/pods/impl.clj index 3f4e838..07d2a0e 100644 --- a/src/babashka/pods/impl.clj +++ b/src/babashka/pods/impl.clj @@ -38,41 +38,44 @@ :json #(cheshire/parse-string-strict % true))] (try (loop [] - (let [reply (read stdout) - id (get reply "id") - id (bytes->string id) - value* (find reply "value") - value (some-> value* - second - bytes->string - read-fn) - status (get reply "status") - status (set (map (comp keyword bytes->string) status)) - done? (contains? status :done) - error? (contains? status :error) - value (if error? - (let [message (or (some-> (get reply "ex-message") - bytes->string) - "") - data (or (some-> (get reply "ex-data") - bytes->string - read-fn) - {})] - (ex-info message data)) - value) - chan (get @chans id) - out (some-> (get reply "out") - bytes->string) - err (some-> (get reply "err") - bytes->string)] - (when (or value* error?) (async/put! chan value)) - (when (or done? error?) (async/close! chan)) - (when out - (binding [*out* out-stream] - (println out))) - (when err (binding [*out* err-stream] - (println err)))) - (recur)) + (let [reply (try (read stdout) + (catch java.io.EOFException _ + ::EOF))] + (when-not (identical? ::EOF reply) + (let [id (get reply "id") + id (bytes->string id) + value* (find reply "value") + value (some-> value* + second + bytes->string + read-fn) + status (get reply "status") + status (set (map (comp keyword bytes->string) status)) + done? (contains? status :done) + error? (contains? status :error) + value (if error? + (let [message (or (some-> (get reply "ex-message") + bytes->string) + "") + data (or (some-> (get reply "ex-data") + bytes->string + read-fn) + {})] + (ex-info message data)) + value) + chan (get @chans id) + out (some-> (get reply "out") + bytes->string) + err (some-> (get reply "err") + bytes->string)] + (when (or value* error?) (async/put! chan value)) + (when (or done? error?) (async/close! chan)) + (when out + (binding [*out* out-stream] + (println out))) + (when err (binding [*out* err-stream] + (println err)))) + (recur)))) (catch Exception e (binding [*out* err-stream] (prn e))))))