diff --git a/README.md b/README.md index c3a7648..17dece7 100644 --- a/README.md +++ b/README.md @@ -331,14 +331,16 @@ as client side code. An example from the (defn watch ([path cb] (watch path cb {})) ([path cb opts] - (babashka.pods/invoke "pod.babashka.filewatcher" - 'pod.babashka.filewatcher/watch* - [path opts] - {:on-success (fn [{:keys [:value :done]}] (cb value)) - :on-error (fn [{:keys [:ex-message :ex-data]}] - (binding [*out* *err*] - (println "ERROR:" ex-message)))}) - nil)) + (babashka.pods/invoke + "pod.babashka.filewatcher" + 'pod.babashka.filewatcher/watch* + [path opts] + {:handler {:success (fn [{:keys [:value]}] (cb value)) + :error (fn [{:keys [:ex-message :ex-data]}] + (binding [*out* *err*] + (println "ERROR:" ex-message))) + :done (fn [_])}}) + nil)) ``` The wrapper function will then invoke `babashka.pods/invoke`, a lower level @@ -350,19 +352,17 @@ The arguments to `babashka.pods/invoke` are: derived from the first described namespace. - the symbol of the var to invoke - the arguments to the var -- an opts map containing `:on-success` and `:on-error` callbacks. +- an opts map containing `:handler` containing callback functions: `:success`, `:error` and `:done` The return value of `babashka.pods/invoke` is a map containing `:result`. When not using callbacks, this is the return value from the pod var invocation. When using callbacks, this value is undefined. -The callback `:on-success` is called with a map containing: +The callback `:success` is called with a map containing: - `:value`: a return value from the pod var -- `:done`: a boolean indicating if the var invocation is done (`true`). If - `false` then more values can be expected. -The callback `:on-error` is called with a map containing: +The callback `:error` is called with a map containing: - `:ex-message`: an error message - `:ex-data`: an arbitrary additional error data map. Typically it will contain @@ -371,6 +371,10 @@ The callback `:on-error` is called with a map containing: If desired, `:ex-message` and `:ex-data` can be reified into a `java.lang.Exception` using `ex-info`. +The callback `:done` is called with one argument which is currently +undefined. This callback can be used to determine if the pod is done sending +values. + In the above example the wrapper function calls the pod identified by `"pod.babashka.filewatcher"`. It calls the var `pod.babashka.filewatcher/watch*`. In `:on-success` it pulls out received diff --git a/src/babashka/pods/impl.clj b/src/babashka/pods/impl.clj index b7d24ea..6f94ace 100644 --- a/src/babashka/pods/impl.clj +++ b/src/babashka/pods/impl.clj @@ -55,8 +55,8 @@ read-fn) status (get reply "status") status (set (map (comp keyword bytes->string) status)) - done? (contains? status :done) error? (contains? status :error) + done? (or error? (contains? status :done)) [ex-message ex-data] (when error? [(or (some-> (get reply "ex-message") @@ -70,8 +70,10 @@ promise? (instance? clojure.lang.IPending chan) exception (when (and promise? error?) (ex-info ex-message ex-data)) - on-success (:on-success chan) - on-error (:on-error chan) + {error-handler :error + done-handler :done + success-handler :success} (when (map? chan) + chan) out (some-> (get reply "out") bytes->string) err (some-> (get reply "err") @@ -79,14 +81,16 @@ (when (or value* error?) (cond promise? (deliver chan (if error? exception value)) - (and (not error?) on-success) - (on-success {:value value - :done done?}) - (and error? on-error) - (on-error {:ex-message ex-message - :ex-data ex-data}))) - (when (and (or done? error?) promise?) - (deliver chan nil)) + (and (not error?) success-handler) + (success-handler {:value value}) + (and error? error-handler) + (error-handler {:ex-message ex-message + :ex-data ex-data}))) + (when done? + (when promise? + (deliver chan nil)) + (when done-handler + (done-handler {}))) (when out (binding [*out* out-stream] (println out))) @@ -100,30 +104,24 @@ (defn next-id [] (str (java.util.UUID/randomUUID))) -(defn invoke [pod pod-var args - {:keys [:on-success - :on-error - :async]}] - (let [stream (:stdin pod) +(defn invoke [pod pod-var args opts] + (let [handlers (:handlers opts) + stream (:stdin pod) format (:format pod) chans (:chans pod) write-fn (case format :edn pr-str :json cheshire/generate-string) id (next-id) - chan (cond async (async/chan) - (or on-success - on-error) {:on-success on-success - :on-error on-error} - :else (promise)) + chan (if handlers handlers + (promise)) _ (swap! chans assoc id chan) _ (write stream {"id" id "op" "invoke" "var" (str pod-var) "args" (write-fn args)})] ;; see: https://blog.jakubholy.net/2019/core-async-error-handling/ - (cond async chan - (or on-success on-error) nil + (cond handlers handlers :else (let [v @chan] (if (instance? Throwable v) (throw v) diff --git a/test-resources/test_program.clj b/test-resources/test_program.clj index c7bc4b2..d5812bc 100644 --- a/test-resources/test_program.clj +++ b/test-resources/test_program.clj @@ -3,10 +3,13 @@ (require '[pod.test-pod :as pod]) (def stream-results (atom [])) +(def done-prom (promise)) (pods/invoke "pod.test-pod" 'pod.test-pod/range-stream [1 10] - {:on-success (fn [{:keys [:value]}] - (swap! stream-results conj value))}) -(while (< (count @stream-results) 9)) + {:handlers {:success (fn [{:keys [:value]}] + (swap! stream-results conj value)) + :done (fn [_] + (deliver done-prom :ok))}}) +@done-prom (def ex-result (try (pod.test-pod/error 1 2 3) @@ -20,13 +23,15 @@ (def callback-result (promise)) (pods/invoke "pod.test-pod" 'pod.test-pod/add-sync [1 2] - {:on-success (fn [{:keys [:value :done]}] - (when done (deliver callback-result value)))}) + {:handlers {:success + (fn [{:keys [:value]}] + (deliver callback-result value))}}) (def error-result (promise)) (pods/invoke "pod.test-pod" 'pod.test-pod/add-sync ["1" 2] - {:on-error (fn [m] - (deliver error-result m))}) + {:handlers + {:error (fn [m] + (deliver error-result m))}}) [(pod/assoc {:a 1} :b 2) (pod.test-pod/add-sync 1 2 3)