Implicit pod-id

This commit is contained in:
Michiel Borkent 2021-05-19 20:34:38 +02:00
parent 05ecf97127
commit 6214f06146
5 changed files with 110 additions and 106 deletions

View file

@ -15,8 +15,8 @@
([pod-id-or-pod sym args] (invoke pod-id-or-pod sym args {})) ([pod-id-or-pod sym args] (invoke pod-id-or-pod sym args {}))
([pod-id-or-pod sym args opts] (jvm/invoke pod-id-or-pod sym args opts))) ([pod-id-or-pod sym args opts] (jvm/invoke pod-id-or-pod sym args opts)))
(defn add-transit-read-handler [pod-id tag fn] (defn add-transit-read-handler [tag fn]
(jvm/add-transit-read-handler pod-id tag fn)) (jvm/add-transit-read-handler tag fn))
(defn add-transit-write-handler [pod-id tag fn classes] (defn add-transit-write-handler [tag fn classes]
(jvm/add-transit-write-handler pod-id tag fn classes)) (jvm/add-transit-write-handler tag fn classes))

View file

@ -39,12 +39,14 @@
(defn next-id [] (defn next-id []
(str (java.util.UUID/randomUUID))) (str (java.util.UUID/randomUUID)))
(def ^:dynamic *pod-id* nil)
(defonce transit-read-handlers (atom {})) (defonce transit-read-handlers (atom {}))
(defonce transit-read-handler-maps (atom {})) (defonce transit-read-handler-maps (atom {}))
(defn update-transit-read-handler-map [pod-id] (defn update-transit-read-handler-map []
(swap! transit-read-handler-maps assoc pod-id (swap! transit-read-handler-maps assoc *pod-id*
(transit/read-handler-map (get @transit-read-handlers pod-id)))) (transit/read-handler-map (get @transit-read-handlers *pod-id*))))
(defn transit-json-read [pod-id ^String s] (defn transit-json-read [pod-id ^String s]
(with-open [bais (java.io.ByteArrayInputStream. (.getBytes s "UTF-8"))] (with-open [bais (java.io.ByteArrayInputStream. (.getBytes s "UTF-8"))]
@ -52,29 +54,28 @@
(transit/read r)))) (transit/read r))))
;; https://www.cognitect.com/blog/2015/9/10/extending-transit ;; https://www.cognitect.com/blog/2015/9/10/extending-transit
(defn add-transit-read-handler [pod-id tag fn] (defn add-transit-read-handler [tag fn]
(let [rh (transit/read-handler fn)] (let [rh (transit/read-handler fn)]
(swap! transit-read-handlers assoc-in [pod-id tag] rh) (swap! transit-read-handlers assoc-in [*pod-id* tag] rh)
(update-transit-read-handler-map pod-id) (update-transit-read-handler-map)
nil)) nil))
(defonce transit-write-handlers (atom {})) (defonce transit-write-handlers (atom {}))
(defonce transit-write-handler-maps (atom {})) (defonce transit-write-handler-maps (atom {}))
(defn update-transit-write-handler-map [pod-id] (defn update-transit-write-handler-map []
(swap! transit-write-handler-maps assoc pod-id (swap! transit-write-handler-maps assoc *pod-id*
(transit/write-handler-map (get @transit-write-handlers pod-id)))) (transit/write-handler-map (get @transit-write-handlers *pod-id*))))
;; https://www.cognitect.com/blog/2015/9/10/extending-transit ;; https://www.cognitect.com/blog/2015/9/10/extending-transit
(defn add-transit-write-handler [pod-id tag fn classes] (defn add-transit-write-handler [tag fn classes]
(let [rh (transit/write-handler tag fn)] (let [rh (transit/write-handler tag fn)]
(doseq [class classes] (doseq [class classes]
(swap! transit-write-handlers assoc-in [pod-id class] rh))) (swap! transit-write-handlers assoc-in [*pod-id* class] rh)))
(update-transit-write-handler-map pod-id) (update-transit-write-handler-map)
nil) nil)
(defn transit-json-write [pod-id ^String s] (defn transit-json-write [pod-id ^String s]
;; (.println System/err (:pod-id pod))
(with-open [baos (java.io.ByteArrayOutputStream. 4096)] (with-open [baos (java.io.ByteArrayOutputStream. 4096)]
(let [w (transit/writer baos :json {:handlers (get @transit-write-handler-maps pod-id)})] (let [w (transit/writer baos :json {:handlers (get @transit-write-handler-maps pod-id)})]
(transit/write w s) (transit/write w s)
@ -147,79 +148,80 @@
(binding [*out* *err*] (binding [*out* *err*]
(println "Cannot read Transit JSON: " (pr-str s)) (println "Cannot read Transit JSON: " (pr-str s))
(throw e))))))] (throw e))))))]
(try (binding [*pod-id* (:pod-id pod)]
(loop [] (try
(let [reply (try (read stdout) (loop []
(catch java.io.EOFException _ (let [reply (try (read stdout)
::EOF))] (catch java.io.EOFException _
(when-not (identical? ::EOF reply) ::EOF))]
(let [id (get reply "id") (when-not (identical? ::EOF reply)
id (bytes->string id) (let [id (get reply "id")
value* (find reply "value") id (bytes->string id)
value (some-> value* value* (find reply "value")
second value (some-> value*
bytes->string second
read-fn) bytes->string
status (get reply "status") read-fn)
status (set (map (comp keyword bytes->string) status)) status (get reply "status")
error? (contains? status :error) status (set (map (comp keyword bytes->string) status))
done? (or error? (contains? status :done)) error? (contains? status :error)
[ex-message ex-data] done? (or error? (contains? status :done))
(when error? [ex-message ex-data]
[(or (some-> (get reply "ex-message") (when error?
bytes->string) [(or (some-> (get reply "ex-message")
"") bytes->string)
(or (some-> (get reply "ex-data") "")
bytes->string (or (some-> (get reply "ex-data")
read-fn) bytes->string
{})]) read-fn)
namespace (when-let [v (get reply "vars")] {})])
(let [name-str (-> (get reply "name") namespace (when-let [v (get reply "vars")]
bytes->string) (let [name-str (-> (get reply "name")
name (symbol name-str)] bytes->string)
{:name name name (symbol name-str)]
:vars (bencode->vars pod name-str v)})) {:name name
chan (get @chans id) :vars (bencode->vars pod name-str v)}))
promise? (instance? clojure.lang.IPending chan) chan (get @chans id)
exception (when (and promise? error?) promise? (instance? clojure.lang.IPending chan)
(ex-info ex-message ex-data)) exception (when (and promise? error?)
;; NOTE: if we need more fine-grained handlers, we will add (ex-info ex-message ex-data))
;; a :raw handler that will just get the bencode message's raw ;; NOTE: if we need more fine-grained handlers, we will add
;; data ;; a :raw handler that will just get the bencode message's raw
{error-handler :error ;; data
done-handler :done {error-handler :error
success-handler :success} (when (map? chan) done-handler :done
chan) success-handler :success} (when (map? chan)
out (some-> (get reply "out") chan)
bytes->string) out (some-> (get reply "out")
err (some-> (get reply "err") bytes->string)
bytes->string)] err (some-> (get reply "err")
;; NOTE: write to out and err before delivering promise for making bytes->string)]
;; listening to output synchronous. ;; NOTE: write to out and err before delivering promise for making
(when out ;; listening to output synchronous.
(binding [*out* out-stream] (when out
(println out))) (binding [*out* out-stream]
(when err (binding [*out* err-stream] (println out)))
(println err))) (when err (binding [*out* err-stream]
(when (or value* error? namespace) (println err)))
(cond promise? (when (or value* error? namespace)
(deliver chan (cond error? exception (cond promise?
value value (deliver chan (cond error? exception
namespace namespace)) value value
(and (not error?) success-handler) namespace namespace))
(success-handler value) (and (not error?) success-handler)
(and error? error-handler) (success-handler value)
(error-handler {:ex-message ex-message (and error? error-handler)
:ex-data ex-data}))) (error-handler {:ex-message ex-message
(when (and done? (not error?)) :ex-data ex-data})))
(when promise? (when (and done? (not error?))
(deliver chan nil)) (when promise?
(when done-handler (deliver chan nil))
(done-handler)))) (when done-handler
(recur)))) (done-handler))))
(catch Exception e (recur))))
(binding [*out* *err* #_err-stream] (catch Exception e
(prn e)))))) (binding [*out* *err* #_err-stream]
(prn e)))))))
(def pods (atom {})) (def pods (atom {}))

View file

@ -55,9 +55,10 @@
(when defer? (when defer?
[ns-name pod])) [ns-name pod]))
namespaces))) namespaces)))
(doseq [[ns-sym vars lazy?] namespaces (binding [impl/*pod-id* (:pod-id pod)]
:when (not lazy?)] (doseq [[ns-sym vars lazy?] namespaces
(process-namespace {:name ns-sym :vars vars})) :when (not lazy?)]
(process-namespace {:name ns-sym :vars vars})))
(future (impl/processor pod)) (future (impl/processor pod))
{:pod/id (:pod-id pod)}))) {:pod/id (:pod-id pod)})))
@ -70,8 +71,8 @@
([pod-id sym args] (invoke pod-id sym args {})) ([pod-id sym args] (invoke pod-id sym args {}))
([pod-id sym args opts] (impl/invoke-public pod-id sym args opts))) ([pod-id sym args opts] (impl/invoke-public pod-id sym args opts)))
(defn add-transit-read-handler [pod-id tag fn] (defn add-transit-read-handler [tag fn]
(impl/add-transit-read-handler pod-id tag fn)) (impl/add-transit-read-handler tag fn))
(defn add-transit-write-handler [pod-id tag fn classes] (defn add-transit-write-handler [tag fn classes]
(impl/add-transit-write-handler pod-id tag fn classes)) (impl/add-transit-write-handler tag fn classes))

View file

@ -65,9 +65,10 @@
(when prev-load-fn (when prev-load-fn
(prev-load-fn m))))] (prev-load-fn m))))]
(swap! env assoc :load-fn new-load-fn))) (swap! env assoc :load-fn new-load-fn)))
(doseq [[ns-name vars lazy?] namespaces (binding [impl/*pod-id* (:pod-id pod)]
:when (not lazy?)] (doseq [[ns-name vars lazy?] namespaces
(process-namespace ctx {:name ns-name :vars vars})) :when (not lazy?)]
(process-namespace ctx {:name ns-name :vars vars})))
(sci/future (impl/processor pod)) (sci/future (impl/processor pod))
{:pod/id (:pod-id pod)}))) {:pod/id (:pod-id pod)})))
@ -80,8 +81,8 @@
([pod-id sym args] (invoke pod-id sym args {})) ([pod-id sym args] (invoke pod-id sym args {}))
([pod-id sym args opts] (impl/invoke-public pod-id sym args opts))) ([pod-id sym args opts] (impl/invoke-public pod-id sym args opts)))
(defn add-transit-read-handler [pod-id tag fn] (defn add-transit-read-handler [tag fn]
(impl/add-transit-read-handler pod-id tag fn)) (impl/add-transit-read-handler tag fn))
(defn add-transit-write-handler [pod-id tag fn classes] (defn add-transit-write-handler [tag fn classes]
(impl/add-transit-write-handler pod-id tag fn classes)) (impl/add-transit-write-handler tag fn classes))

View file

@ -123,9 +123,9 @@
{"name" "-local-date-time"} {"name" "-local-date-time"}
{"name" "local-date-time" {"name" "local-date-time"
"code" " "code" "
(babashka.pods/add-transit-read-handler \"pod.test-pod\" \"local-date-time\" (babashka.pods/add-transit-read-handler \"local-date-time\"
(fn [s] (java.time.LocalDateTime/parse s))) (fn [s] (java.time.LocalDateTime/parse s)))
(babashka.pods/add-transit-write-handler \"pod.test-pod\" \"local-date-time\" (babashka.pods/add-transit-write-handler \"local-date-time\"
str #{java.time.LocalDateTime}) str #{java.time.LocalDateTime})
(defn local-date-time [x] (defn local-date-time [x]
(-local-date-time x))"}] (-local-date-time x))"}]