From 11199be3490a518f2330bd9dab5c8e86aadc8b32 Mon Sep 17 00:00:00 2001 From: Michiel Borkent Date: Tue, 26 May 2020 11:49:55 +0200 Subject: [PATCH] wip --- src/babashka/pods/impl.clj | 216 ++++++++++++++++---------------- src/babashka/pods/jvm.clj | 19 ++- src/babashka/pods/sci.clj | 22 ++-- test-pod/pod/test_pod.clj | 35 ++++-- test-resources/test_program.clj | 4 +- 5 files changed, 149 insertions(+), 147 deletions(-) diff --git a/src/babashka/pods/impl.clj b/src/babashka/pods/impl.clj index 7d1b7da..d63911e 100644 --- a/src/babashka/pods/impl.clj +++ b/src/babashka/pods/impl.clj @@ -30,88 +30,6 @@ (some-> (get m k) bytes->string)) -(def callbacks - (atom {})) - -(defn processor [pod] - (let [stdout (:stdout pod) - format (:format pod) - chans (:chans pod) - out-stream (:out pod) - err-stream (:err pod) - readers (:readers pod) - read-fn (case format - :edn #(edn/read-string {:readers readers} %) - :json #(cheshire/parse-string-strict % true))] - (try - (loop [] - (let [reply (try (read stdout) - (catch java.io.EOFException _ - ::EOF))] - (when-not (identical? ::EOF reply) - (let [id (get reply "id") - id (bytes->string id)] - (if-let [cb (get @callbacks id)] - (do (swap! callbacks dissoc id) - ;; callbacks run in their own threads to not block the - ;; processor - (future (cb reply))) - (let [value* (find reply "value") - value (some-> value* - second - bytes->string - read-fn) - status (get reply "status") - status (set (map (comp keyword bytes->string) status)) - error? (contains? status :error) - done? (or error? (contains? status :done)) - [ex-message ex-data] - (when error? - [(or (some-> (get reply "ex-message") - bytes->string) - "") - (or (some-> (get reply "ex-data") - bytes->string - read-fn) - {})]) - chan (get @chans id) - promise? (instance? clojure.lang.IPending chan) - exception (when (and promise? error?) - (ex-info ex-message ex-data)) - ;; NOTE: if we need more fine-grained handlers, we will add - ;; a :raw handler that will just get the bencode message's raw - ;; data - {error-handler :error - done-handler :done - success-handler :success} (when (map? chan) - chan) - out (some-> (get reply "out") - bytes->string) - err (some-> (get reply "err") - bytes->string)] - (when (or value* error?) - (cond promise? - (deliver chan (if error? exception value)) - (and (not error?) success-handler) - (success-handler value) - (and error? error-handler) - (error-handler {:ex-message ex-message - :ex-data ex-data}))) - (when (and done? (not error?)) - (when promise? - (deliver chan nil)) - (when done-handler - (done-handler))) - (when out - (binding [*out* out-stream] - (println out))) - (when err (binding [*out* err-stream] - (println err)))))) - (recur)))) - (catch Exception e - (binding [*out* *err* #_err-stream] - (prn e)))))) - (defn next-id [] (str (java.util.UUID/randomUUID))) @@ -137,6 +55,103 @@ (throw v) v))))) +(defn bencode->vars [pod ns-name-str vars] + (mapv + (fn [var] + (let [name (get-string var "name") + async? (some-> (get var "async") + bytes->string + #(Boolean/parseBoolean %)) + name-sym (symbol name) + sym (symbol ns-name-str name) + code (get-maybe-string var "code")] + [name-sym + (or code + (fn [& args] + (let [res (invoke pod sym args {:async async?})] + res)))])) + vars)) + +(defn processor [pod] + (let [stdout (:stdout pod) + format (:format pod) + chans (:chans pod) + out-stream (:out pod) + err-stream (:err pod) + readers (:readers pod) + read-fn (case format + :edn #(edn/read-string {:readers readers} %) + :json #(cheshire/parse-string-strict % true))] + (try + (loop [] + (let [reply (try (read stdout) + (catch java.io.EOFException _ + ::EOF))] + (when-not (identical? ::EOF reply) + (let [id (get reply "id") + id (bytes->string id) + value* (find reply "value") + value (some-> value* + second + bytes->string + read-fn) + status (get reply "status") + status (set (map (comp keyword bytes->string) status)) + error? (contains? status :error) + done? (or error? (contains? status :done)) + [ex-message ex-data] + (when error? + [(or (some-> (get reply "ex-message") + bytes->string) + "") + (or (some-> (get reply "ex-data") + bytes->string + read-fn) + {})]) + namespace (when-let [v (get reply "vars")] + (let [name (-> (get reply "name") + bytes->string)] + {:name name :vars (bencode->vars pod name v)})) + chan (get @chans id) + promise? (instance? clojure.lang.IPending chan) + exception (when (and promise? error?) + (ex-info ex-message ex-data)) + ;; NOTE: if we need more fine-grained handlers, we will add + ;; a :raw handler that will just get the bencode message's raw + ;; data + {error-handler :error + done-handler :done + success-handler :success} (when (map? chan) + chan) + out (some-> (get reply "out") + bytes->string) + err (some-> (get reply "err") + bytes->string)] + (when (or value* error? namespace) + (cond promise? + (deliver chan (cond error? exception + value value + namespace namespace)) + (and (not error?) success-handler) + (success-handler value) + (and error? error-handler) + (error-handler {:ex-message ex-message + :ex-data ex-data}))) + (when (and done? (not error?)) + (when promise? + (deliver chan nil)) + (when done-handler + (done-handler))) + (when out + (binding [*out* out-stream] + (println out))) + (when err (binding [*out* err-stream] + (println err)))) + (recur)))) + (catch Exception e + (binding [*out* *err* #_err-stream] + (prn e)))))) + (def pods (atom {})) (defn lookup-pod [pod-id] @@ -169,29 +184,13 @@ dict-vals (map (comp resolve-fn bytes->symbol) (vals dict))] (zipmap dict-keys dict-vals)))) -(defn bencode->vars [pod ns-name-str vars] - (mapv - (fn [var] - (let [name (get-string var "name") - async? (some-> (get var "async") - bytes->string - #(Boolean/parseBoolean %)) - name-sym (symbol name) - sym (symbol ns-name-str name) - code (get-maybe-string var "code")] - [name-sym - (or code - (fn [& args] - (let [res (invoke pod sym args {:async async?})] - res)))])) - vars)) - (defn bencode->namespace [pod namespace] (let [name-str (-> namespace (get "name") bytes->string) name-sym (symbol name-str) vars (get namespace "vars") - vars (bencode->vars pod name-str vars)] - [name-sym vars])) + vars (bencode->vars pod name-str vars) + lazy? (some-> namespace (get-maybe-string "lazy") (= "true"))] + [name-sym vars lazy?])) (defn load-pod ([pod-spec] (load-pod pod-spec nil)) @@ -235,16 +234,11 @@ (swap! pods assoc pod-id pod) pod))) -(defn load-ns [pod namespace callback] - (let [id (next-id) - prom (promise) - callback (fn [reply] - (try (let [[name-sym vars] (bencode->namespace pod reply)] - (callback {:name name-sym :vars vars :done prom})) - (catch Throwable e - (binding [*out* *err*] - (prn e)))))] - (swap! callbacks assoc id callback) +(defn load-ns [pod namespace] + (let [prom (promise) + chans (:chans pod) + id (next-id) + _ (swap! chans assoc id prom)] (write (:stdin pod) {"op" "load-ns" "ns" (str namespace) diff --git a/src/babashka/pods/jvm.clj b/src/babashka/pods/jvm.clj index 97e8c0e..9d524a0 100644 --- a/src/babashka/pods/jvm.clj +++ b/src/babashka/pods/jvm.clj @@ -9,7 +9,7 @@ (replace \/ \. ) (replace \_ \-)))) -(defn- process-namespace [{:keys [:name :vars :done]}] +(defn- process-namespace [{:keys [:name :vars]}] (binding [*ns* (load-string (format "(ns %s) *ns*" name))] (doseq [[var-sym v] vars] (cond @@ -18,8 +18,7 @@ (ns-unmap *ns* var-sym) (intern name var-sym v)) (string? v) - (load-string v)))) - (when done (deliver done :ok))) + (load-string v))))) (let [core-load clojure.core/load] (intern 'clojure.core 'load @@ -28,9 +27,8 @@ (doseq [path paths] (let [lib (unroot-resource path)] (if-let [pod (get nss lib)] - (impl/load-ns - pod lib (fn [namespace] - (process-namespace namespace))) + (let [ns (impl/load-ns pod lib)] + (process-namespace ns)) (core-load path)))))))) (defn load-pod @@ -50,13 +48,12 @@ (swap! namespaces-to-load merge (into {} - (keep (fn [[ns-name vars]] - (when (empty? vars) + (keep (fn [[ns-name _ lazy?]] + (when lazy? [ns-name pod])) namespaces)))) - (doseq [[ns-sym vars] namespaces - :when (or (not load) - (seq vars))] + (doseq [[ns-sym vars lazy?] namespaces + :when (not lazy?)] (process-namespace {:name ns-sym :vars vars})) (future (impl/processor pod)) {:pod/id (:pod-id pod)}))) diff --git a/src/babashka/pods/sci.clj b/src/babashka/pods/sci.clj index 555ff6e..1e88675 100644 --- a/src/babashka/pods/sci.clj +++ b/src/babashka/pods/sci.clj @@ -2,7 +2,7 @@ (:require [babashka.pods.impl :as impl] [sci.core :as sci])) -(defn- process-namespace [ctx {:keys [:name :vars :done]}] +(defn- process-namespace [ctx {:keys [:name :vars]}] (let [env (:env ctx) ns-name name sci-ns (sci/create-ns ns-name)] @@ -13,8 +13,7 @@ (sci/new-var (symbol (str ns-name) (str var-name)) var-value)) (string? var-value) - (sci/eval-string* ctx var-value))))) - (when done (deliver done :ok))) + (sci/eval-string* ctx var-value)))))) (def load-pod (with-meta @@ -43,26 +42,25 @@ namespaces (:namespaces pod) load? (contains? (:ops pod) :load-ns) namespaces-to-load (when load? - (set (keep (fn [[ns-name vars]] - (when (empty? vars) + (set (keep (fn [[ns-name _ lazy?]] + (when lazy? ns-name)) namespaces)))] (when (seq namespaces-to-load) (let [load-fn (fn load-fn [{:keys [:namespace]}] (when (contains? namespaces-to-load namespace) - (impl/load-ns - pod namespace (fn [namespace] - (process-namespace ctx namespace))) - "")) + (let [ns (impl/load-ns pod namespace)] + (process-namespace ctx ns)) + {:file nil + :source ""})) prev-load-fn (:load-fn @env) new-load-fn (fn [m] (or (load-fn m) (when prev-load-fn (prev-load-fn m))))] (swap! env assoc :load-fn new-load-fn))) - (doseq [[ns-name vars] namespaces - :when (or (not load) - (seq vars))] + (doseq [[ns-name vars lazy?] namespaces + :when (not lazy?)] (process-namespace ctx {:name ns-name :vars vars})) (sci/future (impl/processor pod)) {:pod/id (:pod-id pod)}))) diff --git a/test-pod/pod/test_pod.clj b/test-pod/pod/test_pod.clj index 717ee0f..9b0ab17 100644 --- a/test-pod/pod/test_pod.clj +++ b/test-pod/pod/test_pod.clj @@ -81,7 +81,10 @@ {"name" "read-other-tag" "code" "(defn read-other-tag [x] [x x])"}] dependents)} - {"name" "pod.test-pod.loaded"}] + {"name" "pod.test-pod.loaded" + "lazy" "true"} + {"name" "pod.test-pod.loaded2" + "lazy" "true"}] "ops" {"shutdown" {} "load-ns" {}}}) (recur)) @@ -162,16 +165,26 @@ :load-ns (let [ns (-> (get message "ns") read-string symbol) - id (-> (get message "id") - read-string)] - (case ns - pod.test-pod.loaded - (write - {"status" ["done"] - "id" id - "name" "pod.test-pod.loaded" - "vars" [{"name" "loaded" - "code" "(defn loaded [x] (inc x))"}]})))))))) + id (-> (get message "id") + read-string)] + (case ns + pod.test-pod.loaded + (write + {"status" ["done"] + "id" id + "name" "pod.test-pod.loaded" + "vars" [{"name" "loaded" + "code" "(defn loaded [x] (inc x))"}]}) + pod.test-pod.loaded2 + (write + {"status" ["done"] + "id" id + "name" "pod.test-pod.loaded2" + "vars" [{"name" "x" + "code" "(require '[pod.test-pod.loaded :as loaded])"} + {"name" "loaded" + "code" "(defn loaded [x] (loaded/loaded x))"}]})) + (recur))))))) (catch Exception e (binding [*out* *err*] (prn e)))))) diff --git a/test-resources/test_program.clj b/test-resources/test_program.clj index 906d0e3..d1f785c 100644 --- a/test-resources/test_program.clj +++ b/test-resources/test_program.clj @@ -46,8 +46,8 @@ (def tagged (pod/reader-tag)) (def other-tagged (pod/other-tag)) -(require '[pod.test-pod.loaded]) -(def loaded (pod.test-pod.loaded/loaded 1)) +(require '[pod.test-pod.loaded2 :as loaded2]) +(def loaded (loaded2/loaded 1)) (pods/unload-pod pod-id) (def successfully-removed (nil? (find-ns 'pod.test-pod)))