From 6214f061467beeb7801b1060dcf1d6c9fa066d9c Mon Sep 17 00:00:00 2001 From: Michiel Borkent Date: Wed, 19 May 2021 20:34:38 +0200 Subject: [PATCH] Implicit pod-id --- src/babashka/pods.clj | 8 +- src/babashka/pods/impl.clj | 174 +++++++++++++++++++------------------ src/babashka/pods/jvm.clj | 15 ++-- src/babashka/pods/sci.clj | 15 ++-- test-pod/pod/test_pod.clj | 4 +- 5 files changed, 110 insertions(+), 106 deletions(-) diff --git a/src/babashka/pods.clj b/src/babashka/pods.clj index 38f1ea3..77a9ae2 100644 --- a/src/babashka/pods.clj +++ b/src/babashka/pods.clj @@ -15,8 +15,8 @@ ([pod-id-or-pod sym args] (invoke pod-id-or-pod sym args {})) ([pod-id-or-pod sym args opts] (jvm/invoke pod-id-or-pod sym args opts))) -(defn add-transit-read-handler [pod-id tag fn] - (jvm/add-transit-read-handler pod-id tag fn)) +(defn add-transit-read-handler [tag fn] + (jvm/add-transit-read-handler tag fn)) -(defn add-transit-write-handler [pod-id tag fn classes] - (jvm/add-transit-write-handler pod-id tag fn classes)) +(defn add-transit-write-handler [tag fn classes] + (jvm/add-transit-write-handler tag fn classes)) diff --git a/src/babashka/pods/impl.clj b/src/babashka/pods/impl.clj index e4fbc42..f535ad7 100644 --- a/src/babashka/pods/impl.clj +++ b/src/babashka/pods/impl.clj @@ -39,12 +39,14 @@ (defn next-id [] (str (java.util.UUID/randomUUID))) +(def ^:dynamic *pod-id* nil) + (defonce transit-read-handlers (atom {})) (defonce transit-read-handler-maps (atom {})) -(defn update-transit-read-handler-map [pod-id] - (swap! transit-read-handler-maps assoc pod-id - (transit/read-handler-map (get @transit-read-handlers pod-id)))) +(defn update-transit-read-handler-map [] + (swap! transit-read-handler-maps assoc *pod-id* + (transit/read-handler-map (get @transit-read-handlers *pod-id*)))) (defn transit-json-read [pod-id ^String s] (with-open [bais (java.io.ByteArrayInputStream. (.getBytes s "UTF-8"))] @@ -52,29 +54,28 @@ (transit/read r)))) ;; https://www.cognitect.com/blog/2015/9/10/extending-transit -(defn add-transit-read-handler [pod-id tag fn] +(defn add-transit-read-handler [tag fn] (let [rh (transit/read-handler fn)] - (swap! transit-read-handlers assoc-in [pod-id tag] rh) - (update-transit-read-handler-map pod-id) + (swap! transit-read-handlers assoc-in [*pod-id* tag] rh) + (update-transit-read-handler-map) nil)) (defonce transit-write-handlers (atom {})) (defonce transit-write-handler-maps (atom {})) -(defn update-transit-write-handler-map [pod-id] - (swap! transit-write-handler-maps assoc pod-id - (transit/write-handler-map (get @transit-write-handlers pod-id)))) +(defn update-transit-write-handler-map [] + (swap! transit-write-handler-maps assoc *pod-id* + (transit/write-handler-map (get @transit-write-handlers *pod-id*)))) ;; https://www.cognitect.com/blog/2015/9/10/extending-transit -(defn add-transit-write-handler [pod-id tag fn classes] +(defn add-transit-write-handler [tag fn classes] (let [rh (transit/write-handler tag fn)] (doseq [class classes] - (swap! transit-write-handlers assoc-in [pod-id class] rh))) - (update-transit-write-handler-map pod-id) + (swap! transit-write-handlers assoc-in [*pod-id* class] rh))) + (update-transit-write-handler-map) nil) (defn transit-json-write [pod-id ^String s] - ;; (.println System/err (:pod-id pod)) (with-open [baos (java.io.ByteArrayOutputStream. 4096)] (let [w (transit/writer baos :json {:handlers (get @transit-write-handler-maps pod-id)})] (transit/write w s) @@ -147,79 +148,80 @@ (binding [*out* *err*] (println "Cannot read Transit JSON: " (pr-str s)) (throw e))))))] - (try - (loop [] - (let [reply (try (read stdout) - (catch java.io.EOFException _ - ::EOF))] - (when-not (identical? ::EOF reply) - (let [id (get reply "id") - id (bytes->string id) - value* (find reply "value") - value (some-> value* - second - bytes->string - read-fn) - status (get reply "status") - status (set (map (comp keyword bytes->string) status)) - error? (contains? status :error) - done? (or error? (contains? status :done)) - [ex-message ex-data] - (when error? - [(or (some-> (get reply "ex-message") - bytes->string) - "") - (or (some-> (get reply "ex-data") - bytes->string - read-fn) - {})]) - namespace (when-let [v (get reply "vars")] - (let [name-str (-> (get reply "name") - bytes->string) - name (symbol name-str)] - {:name name - :vars (bencode->vars pod name-str v)})) - chan (get @chans id) - promise? (instance? clojure.lang.IPending chan) - exception (when (and promise? error?) - (ex-info ex-message ex-data)) - ;; NOTE: if we need more fine-grained handlers, we will add - ;; a :raw handler that will just get the bencode message's raw - ;; data - {error-handler :error - done-handler :done - success-handler :success} (when (map? chan) - chan) - out (some-> (get reply "out") - bytes->string) - err (some-> (get reply "err") - bytes->string)] - ;; NOTE: write to out and err before delivering promise for making - ;; listening to output synchronous. - (when out - (binding [*out* out-stream] - (println out))) - (when err (binding [*out* err-stream] - (println err))) - (when (or value* error? namespace) - (cond promise? - (deliver chan (cond error? exception - value value - namespace namespace)) - (and (not error?) success-handler) - (success-handler value) - (and error? error-handler) - (error-handler {:ex-message ex-message - :ex-data ex-data}))) - (when (and done? (not error?)) - (when promise? - (deliver chan nil)) - (when done-handler - (done-handler)))) - (recur)))) - (catch Exception e - (binding [*out* *err* #_err-stream] - (prn e)))))) + (binding [*pod-id* (:pod-id pod)] + (try + (loop [] + (let [reply (try (read stdout) + (catch java.io.EOFException _ + ::EOF))] + (when-not (identical? ::EOF reply) + (let [id (get reply "id") + id (bytes->string id) + value* (find reply "value") + value (some-> value* + second + bytes->string + read-fn) + status (get reply "status") + status (set (map (comp keyword bytes->string) status)) + error? (contains? status :error) + done? (or error? (contains? status :done)) + [ex-message ex-data] + (when error? + [(or (some-> (get reply "ex-message") + bytes->string) + "") + (or (some-> (get reply "ex-data") + bytes->string + read-fn) + {})]) + namespace (when-let [v (get reply "vars")] + (let [name-str (-> (get reply "name") + bytes->string) + name (symbol name-str)] + {:name name + :vars (bencode->vars pod name-str v)})) + chan (get @chans id) + promise? (instance? clojure.lang.IPending chan) + exception (when (and promise? error?) + (ex-info ex-message ex-data)) + ;; NOTE: if we need more fine-grained handlers, we will add + ;; a :raw handler that will just get the bencode message's raw + ;; data + {error-handler :error + done-handler :done + success-handler :success} (when (map? chan) + chan) + out (some-> (get reply "out") + bytes->string) + err (some-> (get reply "err") + bytes->string)] + ;; NOTE: write to out and err before delivering promise for making + ;; listening to output synchronous. + (when out + (binding [*out* out-stream] + (println out))) + (when err (binding [*out* err-stream] + (println err))) + (when (or value* error? namespace) + (cond promise? + (deliver chan (cond error? exception + value value + namespace namespace)) + (and (not error?) success-handler) + (success-handler value) + (and error? error-handler) + (error-handler {:ex-message ex-message + :ex-data ex-data}))) + (when (and done? (not error?)) + (when promise? + (deliver chan nil)) + (when done-handler + (done-handler)))) + (recur)))) + (catch Exception e + (binding [*out* *err* #_err-stream] + (prn e))))))) (def pods (atom {})) diff --git a/src/babashka/pods/jvm.clj b/src/babashka/pods/jvm.clj index f8f7017..9c4a023 100644 --- a/src/babashka/pods/jvm.clj +++ b/src/babashka/pods/jvm.clj @@ -55,9 +55,10 @@ (when defer? [ns-name pod])) namespaces))) - (doseq [[ns-sym vars lazy?] namespaces - :when (not lazy?)] - (process-namespace {:name ns-sym :vars vars})) + (binding [impl/*pod-id* (:pod-id pod)] + (doseq [[ns-sym vars lazy?] namespaces + :when (not lazy?)] + (process-namespace {:name ns-sym :vars vars}))) (future (impl/processor pod)) {:pod/id (:pod-id pod)}))) @@ -70,8 +71,8 @@ ([pod-id sym args] (invoke pod-id sym args {})) ([pod-id sym args opts] (impl/invoke-public pod-id sym args opts))) -(defn add-transit-read-handler [pod-id tag fn] - (impl/add-transit-read-handler pod-id tag fn)) +(defn add-transit-read-handler [tag fn] + (impl/add-transit-read-handler tag fn)) -(defn add-transit-write-handler [pod-id tag fn classes] - (impl/add-transit-write-handler pod-id tag fn classes)) +(defn add-transit-write-handler [tag fn classes] + (impl/add-transit-write-handler tag fn classes)) diff --git a/src/babashka/pods/sci.clj b/src/babashka/pods/sci.clj index d4d23a1..7e85f63 100644 --- a/src/babashka/pods/sci.clj +++ b/src/babashka/pods/sci.clj @@ -65,9 +65,10 @@ (when prev-load-fn (prev-load-fn m))))] (swap! env assoc :load-fn new-load-fn))) - (doseq [[ns-name vars lazy?] namespaces - :when (not lazy?)] - (process-namespace ctx {:name ns-name :vars vars})) + (binding [impl/*pod-id* (:pod-id pod)] + (doseq [[ns-name vars lazy?] namespaces + :when (not lazy?)] + (process-namespace ctx {:name ns-name :vars vars}))) (sci/future (impl/processor pod)) {:pod/id (:pod-id pod)}))) @@ -80,8 +81,8 @@ ([pod-id sym args] (invoke pod-id sym args {})) ([pod-id sym args opts] (impl/invoke-public pod-id sym args opts))) -(defn add-transit-read-handler [pod-id tag fn] - (impl/add-transit-read-handler pod-id tag fn)) +(defn add-transit-read-handler [tag fn] + (impl/add-transit-read-handler tag fn)) -(defn add-transit-write-handler [pod-id tag fn classes] - (impl/add-transit-write-handler pod-id tag fn classes)) +(defn add-transit-write-handler [tag fn classes] + (impl/add-transit-write-handler tag fn classes)) diff --git a/test-pod/pod/test_pod.clj b/test-pod/pod/test_pod.clj index a529468..48cc3c9 100644 --- a/test-pod/pod/test_pod.clj +++ b/test-pod/pod/test_pod.clj @@ -123,9 +123,9 @@ {"name" "-local-date-time"} {"name" "local-date-time" "code" " -(babashka.pods/add-transit-read-handler \"pod.test-pod\" \"local-date-time\" +(babashka.pods/add-transit-read-handler \"local-date-time\" (fn [s] (java.time.LocalDateTime/parse s))) -(babashka.pods/add-transit-write-handler \"pod.test-pod\" \"local-date-time\" +(babashka.pods/add-transit-write-handler \"local-date-time\" str #{java.time.LocalDateTime}) (defn local-date-time [x] (-local-date-time x))"}]