[#6] refactor handlers

This commit is contained in:
Michiel Borkent 2020-05-20 13:28:04 +02:00
parent a673ab4c8a
commit 57b86f301d
3 changed files with 50 additions and 43 deletions

View file

@ -331,14 +331,16 @@ as client side code. An example from the
(defn watch (defn watch
([path cb] (watch path cb {})) ([path cb] (watch path cb {}))
([path cb opts] ([path cb opts]
(babashka.pods/invoke "pod.babashka.filewatcher" (babashka.pods/invoke
'pod.babashka.filewatcher/watch* "pod.babashka.filewatcher"
[path opts] 'pod.babashka.filewatcher/watch*
{:on-success (fn [{:keys [:value :done]}] (cb value)) [path opts]
:on-error (fn [{:keys [:ex-message :ex-data]}] {:handler {:success (fn [{:keys [:value]}] (cb value))
(binding [*out* *err*] :error (fn [{:keys [:ex-message :ex-data]}]
(println "ERROR:" ex-message)))}) (binding [*out* *err*]
nil)) (println "ERROR:" ex-message)))
:done (fn [_])}})
nil))
``` ```
The wrapper function will then invoke `babashka.pods/invoke`, a lower level The wrapper function will then invoke `babashka.pods/invoke`, a lower level
@ -350,19 +352,17 @@ The arguments to `babashka.pods/invoke` are:
derived from the first described namespace. derived from the first described namespace.
- the symbol of the var to invoke - the symbol of the var to invoke
- the arguments to the var - the arguments to the var
- an opts map containing `:on-success` and `:on-error` callbacks. - an opts map containing `:handler` containing callback functions: `:success`, `:error` and `:done`
The return value of `babashka.pods/invoke` is a map containing `:result`. When The return value of `babashka.pods/invoke` is a map containing `:result`. When
not using callbacks, this is the return value from the pod var invocation. When not using callbacks, this is the return value from the pod var invocation. When
using callbacks, this value is undefined. using callbacks, this value is undefined.
The callback `:on-success` is called with a map containing: The callback `:success` is called with a map containing:
- `:value`: a return value from the pod var - `:value`: a return value from the pod var
- `:done`: a boolean indicating if the var invocation is done (`true`). If
`false` then more values can be expected.
The callback `:on-error` is called with a map containing: The callback `:error` is called with a map containing:
- `:ex-message`: an error message - `:ex-message`: an error message
- `:ex-data`: an arbitrary additional error data map. Typically it will contain - `:ex-data`: an arbitrary additional error data map. Typically it will contain
@ -371,6 +371,10 @@ The callback `:on-error` is called with a map containing:
If desired, `:ex-message` and `:ex-data` can be reified into a If desired, `:ex-message` and `:ex-data` can be reified into a
`java.lang.Exception` using `ex-info`. `java.lang.Exception` using `ex-info`.
The callback `:done` is called with one argument which is currently
undefined. This callback can be used to determine if the pod is done sending
values.
In the above example the wrapper function calls the pod identified by In the above example the wrapper function calls the pod identified by
`"pod.babashka.filewatcher"`. It calls the var `"pod.babashka.filewatcher"`. It calls the var
`pod.babashka.filewatcher/watch*`. In `:on-success` it pulls out received `pod.babashka.filewatcher/watch*`. In `:on-success` it pulls out received

View file

@ -55,8 +55,8 @@
read-fn) read-fn)
status (get reply "status") status (get reply "status")
status (set (map (comp keyword bytes->string) status)) status (set (map (comp keyword bytes->string) status))
done? (contains? status :done)
error? (contains? status :error) error? (contains? status :error)
done? (or error? (contains? status :done))
[ex-message ex-data] [ex-message ex-data]
(when error? (when error?
[(or (some-> (get reply "ex-message") [(or (some-> (get reply "ex-message")
@ -70,8 +70,10 @@
promise? (instance? clojure.lang.IPending chan) promise? (instance? clojure.lang.IPending chan)
exception (when (and promise? error?) exception (when (and promise? error?)
(ex-info ex-message ex-data)) (ex-info ex-message ex-data))
on-success (:on-success chan) {error-handler :error
on-error (:on-error chan) done-handler :done
success-handler :success} (when (map? chan)
chan)
out (some-> (get reply "out") out (some-> (get reply "out")
bytes->string) bytes->string)
err (some-> (get reply "err") err (some-> (get reply "err")
@ -79,14 +81,16 @@
(when (or value* error?) (when (or value* error?)
(cond promise? (cond promise?
(deliver chan (if error? exception value)) (deliver chan (if error? exception value))
(and (not error?) on-success) (and (not error?) success-handler)
(on-success {:value value (success-handler {:value value})
:done done?}) (and error? error-handler)
(and error? on-error) (error-handler {:ex-message ex-message
(on-error {:ex-message ex-message :ex-data ex-data})))
:ex-data ex-data}))) (when done?
(when (and (or done? error?) promise?) (when promise?
(deliver chan nil)) (deliver chan nil))
(when done-handler
(done-handler {})))
(when out (when out
(binding [*out* out-stream] (binding [*out* out-stream]
(println out))) (println out)))
@ -100,30 +104,24 @@
(defn next-id [] (defn next-id []
(str (java.util.UUID/randomUUID))) (str (java.util.UUID/randomUUID)))
(defn invoke [pod pod-var args (defn invoke [pod pod-var args opts]
{:keys [:on-success (let [handlers (:handlers opts)
:on-error stream (:stdin pod)
:async]}]
(let [stream (:stdin pod)
format (:format pod) format (:format pod)
chans (:chans pod) chans (:chans pod)
write-fn (case format write-fn (case format
:edn pr-str :edn pr-str
:json cheshire/generate-string) :json cheshire/generate-string)
id (next-id) id (next-id)
chan (cond async (async/chan) chan (if handlers handlers
(or on-success (promise))
on-error) {:on-success on-success
:on-error on-error}
:else (promise))
_ (swap! chans assoc id chan) _ (swap! chans assoc id chan)
_ (write stream {"id" id _ (write stream {"id" id
"op" "invoke" "op" "invoke"
"var" (str pod-var) "var" (str pod-var)
"args" (write-fn args)})] "args" (write-fn args)})]
;; see: https://blog.jakubholy.net/2019/core-async-error-handling/ ;; see: https://blog.jakubholy.net/2019/core-async-error-handling/
(cond async chan (cond handlers handlers
(or on-success on-error) nil
:else (let [v @chan] :else (let [v @chan]
(if (instance? Throwable v) (if (instance? Throwable v)
(throw v) (throw v)

View file

@ -3,10 +3,13 @@
(require '[pod.test-pod :as pod]) (require '[pod.test-pod :as pod])
(def stream-results (atom [])) (def stream-results (atom []))
(def done-prom (promise))
(pods/invoke "pod.test-pod" 'pod.test-pod/range-stream [1 10] (pods/invoke "pod.test-pod" 'pod.test-pod/range-stream [1 10]
{:on-success (fn [{:keys [:value]}] {:handlers {:success (fn [{:keys [:value]}]
(swap! stream-results conj value))}) (swap! stream-results conj value))
(while (< (count @stream-results) 9)) :done (fn [_]
(deliver done-prom :ok))}})
@done-prom
(def ex-result (def ex-result
(try (pod.test-pod/error 1 2 3) (try (pod.test-pod/error 1 2 3)
@ -20,13 +23,15 @@
(def callback-result (promise)) (def callback-result (promise))
(pods/invoke "pod.test-pod" 'pod.test-pod/add-sync [1 2] (pods/invoke "pod.test-pod" 'pod.test-pod/add-sync [1 2]
{:on-success (fn [{:keys [:value :done]}] {:handlers {:success
(when done (deliver callback-result value)))}) (fn [{:keys [:value]}]
(deliver callback-result value))}})
(def error-result (promise)) (def error-result (promise))
(pods/invoke "pod.test-pod" 'pod.test-pod/add-sync ["1" 2] (pods/invoke "pod.test-pod" 'pod.test-pod/add-sync ["1" 2]
{:on-error (fn [m] {:handlers
(deliver error-result m))}) {:error (fn [m]
(deliver error-result m))}})
[(pod/assoc {:a 1} :b 2) [(pod/assoc {:a 1} :b 2)
(pod.test-pod/add-sync 1 2 3) (pod.test-pod/add-sync 1 2 3)