wip
This commit is contained in:
parent
f8fd641287
commit
11199be349
5 changed files with 149 additions and 147 deletions
|
|
@ -30,88 +30,6 @@
|
||||||
(some-> (get m k)
|
(some-> (get m k)
|
||||||
bytes->string))
|
bytes->string))
|
||||||
|
|
||||||
(def callbacks
|
|
||||||
(atom {}))
|
|
||||||
|
|
||||||
(defn processor [pod]
|
|
||||||
(let [stdout (:stdout pod)
|
|
||||||
format (:format pod)
|
|
||||||
chans (:chans pod)
|
|
||||||
out-stream (:out pod)
|
|
||||||
err-stream (:err pod)
|
|
||||||
readers (:readers pod)
|
|
||||||
read-fn (case format
|
|
||||||
:edn #(edn/read-string {:readers readers} %)
|
|
||||||
:json #(cheshire/parse-string-strict % true))]
|
|
||||||
(try
|
|
||||||
(loop []
|
|
||||||
(let [reply (try (read stdout)
|
|
||||||
(catch java.io.EOFException _
|
|
||||||
::EOF))]
|
|
||||||
(when-not (identical? ::EOF reply)
|
|
||||||
(let [id (get reply "id")
|
|
||||||
id (bytes->string id)]
|
|
||||||
(if-let [cb (get @callbacks id)]
|
|
||||||
(do (swap! callbacks dissoc id)
|
|
||||||
;; callbacks run in their own threads to not block the
|
|
||||||
;; processor
|
|
||||||
(future (cb reply)))
|
|
||||||
(let [value* (find reply "value")
|
|
||||||
value (some-> value*
|
|
||||||
second
|
|
||||||
bytes->string
|
|
||||||
read-fn)
|
|
||||||
status (get reply "status")
|
|
||||||
status (set (map (comp keyword bytes->string) status))
|
|
||||||
error? (contains? status :error)
|
|
||||||
done? (or error? (contains? status :done))
|
|
||||||
[ex-message ex-data]
|
|
||||||
(when error?
|
|
||||||
[(or (some-> (get reply "ex-message")
|
|
||||||
bytes->string)
|
|
||||||
"")
|
|
||||||
(or (some-> (get reply "ex-data")
|
|
||||||
bytes->string
|
|
||||||
read-fn)
|
|
||||||
{})])
|
|
||||||
chan (get @chans id)
|
|
||||||
promise? (instance? clojure.lang.IPending chan)
|
|
||||||
exception (when (and promise? error?)
|
|
||||||
(ex-info ex-message ex-data))
|
|
||||||
;; NOTE: if we need more fine-grained handlers, we will add
|
|
||||||
;; a :raw handler that will just get the bencode message's raw
|
|
||||||
;; data
|
|
||||||
{error-handler :error
|
|
||||||
done-handler :done
|
|
||||||
success-handler :success} (when (map? chan)
|
|
||||||
chan)
|
|
||||||
out (some-> (get reply "out")
|
|
||||||
bytes->string)
|
|
||||||
err (some-> (get reply "err")
|
|
||||||
bytes->string)]
|
|
||||||
(when (or value* error?)
|
|
||||||
(cond promise?
|
|
||||||
(deliver chan (if error? exception value))
|
|
||||||
(and (not error?) success-handler)
|
|
||||||
(success-handler value)
|
|
||||||
(and error? error-handler)
|
|
||||||
(error-handler {:ex-message ex-message
|
|
||||||
:ex-data ex-data})))
|
|
||||||
(when (and done? (not error?))
|
|
||||||
(when promise?
|
|
||||||
(deliver chan nil))
|
|
||||||
(when done-handler
|
|
||||||
(done-handler)))
|
|
||||||
(when out
|
|
||||||
(binding [*out* out-stream]
|
|
||||||
(println out)))
|
|
||||||
(when err (binding [*out* err-stream]
|
|
||||||
(println err))))))
|
|
||||||
(recur))))
|
|
||||||
(catch Exception e
|
|
||||||
(binding [*out* *err* #_err-stream]
|
|
||||||
(prn e))))))
|
|
||||||
|
|
||||||
(defn next-id []
|
(defn next-id []
|
||||||
(str (java.util.UUID/randomUUID)))
|
(str (java.util.UUID/randomUUID)))
|
||||||
|
|
||||||
|
|
@ -137,6 +55,103 @@
|
||||||
(throw v)
|
(throw v)
|
||||||
v)))))
|
v)))))
|
||||||
|
|
||||||
|
(defn bencode->vars [pod ns-name-str vars]
|
||||||
|
(mapv
|
||||||
|
(fn [var]
|
||||||
|
(let [name (get-string var "name")
|
||||||
|
async? (some-> (get var "async")
|
||||||
|
bytes->string
|
||||||
|
#(Boolean/parseBoolean %))
|
||||||
|
name-sym (symbol name)
|
||||||
|
sym (symbol ns-name-str name)
|
||||||
|
code (get-maybe-string var "code")]
|
||||||
|
[name-sym
|
||||||
|
(or code
|
||||||
|
(fn [& args]
|
||||||
|
(let [res (invoke pod sym args {:async async?})]
|
||||||
|
res)))]))
|
||||||
|
vars))
|
||||||
|
|
||||||
|
(defn processor [pod]
|
||||||
|
(let [stdout (:stdout pod)
|
||||||
|
format (:format pod)
|
||||||
|
chans (:chans pod)
|
||||||
|
out-stream (:out pod)
|
||||||
|
err-stream (:err pod)
|
||||||
|
readers (:readers pod)
|
||||||
|
read-fn (case format
|
||||||
|
:edn #(edn/read-string {:readers readers} %)
|
||||||
|
:json #(cheshire/parse-string-strict % true))]
|
||||||
|
(try
|
||||||
|
(loop []
|
||||||
|
(let [reply (try (read stdout)
|
||||||
|
(catch java.io.EOFException _
|
||||||
|
::EOF))]
|
||||||
|
(when-not (identical? ::EOF reply)
|
||||||
|
(let [id (get reply "id")
|
||||||
|
id (bytes->string id)
|
||||||
|
value* (find reply "value")
|
||||||
|
value (some-> value*
|
||||||
|
second
|
||||||
|
bytes->string
|
||||||
|
read-fn)
|
||||||
|
status (get reply "status")
|
||||||
|
status (set (map (comp keyword bytes->string) status))
|
||||||
|
error? (contains? status :error)
|
||||||
|
done? (or error? (contains? status :done))
|
||||||
|
[ex-message ex-data]
|
||||||
|
(when error?
|
||||||
|
[(or (some-> (get reply "ex-message")
|
||||||
|
bytes->string)
|
||||||
|
"")
|
||||||
|
(or (some-> (get reply "ex-data")
|
||||||
|
bytes->string
|
||||||
|
read-fn)
|
||||||
|
{})])
|
||||||
|
namespace (when-let [v (get reply "vars")]
|
||||||
|
(let [name (-> (get reply "name")
|
||||||
|
bytes->string)]
|
||||||
|
{:name name :vars (bencode->vars pod name v)}))
|
||||||
|
chan (get @chans id)
|
||||||
|
promise? (instance? clojure.lang.IPending chan)
|
||||||
|
exception (when (and promise? error?)
|
||||||
|
(ex-info ex-message ex-data))
|
||||||
|
;; NOTE: if we need more fine-grained handlers, we will add
|
||||||
|
;; a :raw handler that will just get the bencode message's raw
|
||||||
|
;; data
|
||||||
|
{error-handler :error
|
||||||
|
done-handler :done
|
||||||
|
success-handler :success} (when (map? chan)
|
||||||
|
chan)
|
||||||
|
out (some-> (get reply "out")
|
||||||
|
bytes->string)
|
||||||
|
err (some-> (get reply "err")
|
||||||
|
bytes->string)]
|
||||||
|
(when (or value* error? namespace)
|
||||||
|
(cond promise?
|
||||||
|
(deliver chan (cond error? exception
|
||||||
|
value value
|
||||||
|
namespace namespace))
|
||||||
|
(and (not error?) success-handler)
|
||||||
|
(success-handler value)
|
||||||
|
(and error? error-handler)
|
||||||
|
(error-handler {:ex-message ex-message
|
||||||
|
:ex-data ex-data})))
|
||||||
|
(when (and done? (not error?))
|
||||||
|
(when promise?
|
||||||
|
(deliver chan nil))
|
||||||
|
(when done-handler
|
||||||
|
(done-handler)))
|
||||||
|
(when out
|
||||||
|
(binding [*out* out-stream]
|
||||||
|
(println out)))
|
||||||
|
(when err (binding [*out* err-stream]
|
||||||
|
(println err))))
|
||||||
|
(recur))))
|
||||||
|
(catch Exception e
|
||||||
|
(binding [*out* *err* #_err-stream]
|
||||||
|
(prn e))))))
|
||||||
|
|
||||||
(def pods (atom {}))
|
(def pods (atom {}))
|
||||||
|
|
||||||
(defn lookup-pod [pod-id]
|
(defn lookup-pod [pod-id]
|
||||||
|
|
@ -169,29 +184,13 @@
|
||||||
dict-vals (map (comp resolve-fn bytes->symbol) (vals dict))]
|
dict-vals (map (comp resolve-fn bytes->symbol) (vals dict))]
|
||||||
(zipmap dict-keys dict-vals))))
|
(zipmap dict-keys dict-vals))))
|
||||||
|
|
||||||
(defn bencode->vars [pod ns-name-str vars]
|
|
||||||
(mapv
|
|
||||||
(fn [var]
|
|
||||||
(let [name (get-string var "name")
|
|
||||||
async? (some-> (get var "async")
|
|
||||||
bytes->string
|
|
||||||
#(Boolean/parseBoolean %))
|
|
||||||
name-sym (symbol name)
|
|
||||||
sym (symbol ns-name-str name)
|
|
||||||
code (get-maybe-string var "code")]
|
|
||||||
[name-sym
|
|
||||||
(or code
|
|
||||||
(fn [& args]
|
|
||||||
(let [res (invoke pod sym args {:async async?})]
|
|
||||||
res)))]))
|
|
||||||
vars))
|
|
||||||
|
|
||||||
(defn bencode->namespace [pod namespace]
|
(defn bencode->namespace [pod namespace]
|
||||||
(let [name-str (-> namespace (get "name") bytes->string)
|
(let [name-str (-> namespace (get "name") bytes->string)
|
||||||
name-sym (symbol name-str)
|
name-sym (symbol name-str)
|
||||||
vars (get namespace "vars")
|
vars (get namespace "vars")
|
||||||
vars (bencode->vars pod name-str vars)]
|
vars (bencode->vars pod name-str vars)
|
||||||
[name-sym vars]))
|
lazy? (some-> namespace (get-maybe-string "lazy") (= "true"))]
|
||||||
|
[name-sym vars lazy?]))
|
||||||
|
|
||||||
(defn load-pod
|
(defn load-pod
|
||||||
([pod-spec] (load-pod pod-spec nil))
|
([pod-spec] (load-pod pod-spec nil))
|
||||||
|
|
@ -235,16 +234,11 @@
|
||||||
(swap! pods assoc pod-id pod)
|
(swap! pods assoc pod-id pod)
|
||||||
pod)))
|
pod)))
|
||||||
|
|
||||||
(defn load-ns [pod namespace callback]
|
(defn load-ns [pod namespace]
|
||||||
(let [id (next-id)
|
(let [prom (promise)
|
||||||
prom (promise)
|
chans (:chans pod)
|
||||||
callback (fn [reply]
|
id (next-id)
|
||||||
(try (let [[name-sym vars] (bencode->namespace pod reply)]
|
_ (swap! chans assoc id prom)]
|
||||||
(callback {:name name-sym :vars vars :done prom}))
|
|
||||||
(catch Throwable e
|
|
||||||
(binding [*out* *err*]
|
|
||||||
(prn e)))))]
|
|
||||||
(swap! callbacks assoc id callback)
|
|
||||||
(write (:stdin pod)
|
(write (:stdin pod)
|
||||||
{"op" "load-ns"
|
{"op" "load-ns"
|
||||||
"ns" (str namespace)
|
"ns" (str namespace)
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@
|
||||||
(replace \/ \. )
|
(replace \/ \. )
|
||||||
(replace \_ \-))))
|
(replace \_ \-))))
|
||||||
|
|
||||||
(defn- process-namespace [{:keys [:name :vars :done]}]
|
(defn- process-namespace [{:keys [:name :vars]}]
|
||||||
(binding [*ns* (load-string (format "(ns %s) *ns*" name))]
|
(binding [*ns* (load-string (format "(ns %s) *ns*" name))]
|
||||||
(doseq [[var-sym v] vars]
|
(doseq [[var-sym v] vars]
|
||||||
(cond
|
(cond
|
||||||
|
|
@ -18,8 +18,7 @@
|
||||||
(ns-unmap *ns* var-sym)
|
(ns-unmap *ns* var-sym)
|
||||||
(intern name var-sym v))
|
(intern name var-sym v))
|
||||||
(string? v)
|
(string? v)
|
||||||
(load-string v))))
|
(load-string v)))))
|
||||||
(when done (deliver done :ok)))
|
|
||||||
|
|
||||||
(let [core-load clojure.core/load]
|
(let [core-load clojure.core/load]
|
||||||
(intern 'clojure.core 'load
|
(intern 'clojure.core 'load
|
||||||
|
|
@ -28,9 +27,8 @@
|
||||||
(doseq [path paths]
|
(doseq [path paths]
|
||||||
(let [lib (unroot-resource path)]
|
(let [lib (unroot-resource path)]
|
||||||
(if-let [pod (get nss lib)]
|
(if-let [pod (get nss lib)]
|
||||||
(impl/load-ns
|
(let [ns (impl/load-ns pod lib)]
|
||||||
pod lib (fn [namespace]
|
(process-namespace ns))
|
||||||
(process-namespace namespace)))
|
|
||||||
(core-load path))))))))
|
(core-load path))))))))
|
||||||
|
|
||||||
(defn load-pod
|
(defn load-pod
|
||||||
|
|
@ -50,13 +48,12 @@
|
||||||
(swap! namespaces-to-load
|
(swap! namespaces-to-load
|
||||||
merge
|
merge
|
||||||
(into {}
|
(into {}
|
||||||
(keep (fn [[ns-name vars]]
|
(keep (fn [[ns-name _ lazy?]]
|
||||||
(when (empty? vars)
|
(when lazy?
|
||||||
[ns-name pod]))
|
[ns-name pod]))
|
||||||
namespaces))))
|
namespaces))))
|
||||||
(doseq [[ns-sym vars] namespaces
|
(doseq [[ns-sym vars lazy?] namespaces
|
||||||
:when (or (not load)
|
:when (not lazy?)]
|
||||||
(seq vars))]
|
|
||||||
(process-namespace {:name ns-sym :vars vars}))
|
(process-namespace {:name ns-sym :vars vars}))
|
||||||
(future (impl/processor pod))
|
(future (impl/processor pod))
|
||||||
{:pod/id (:pod-id pod)})))
|
{:pod/id (:pod-id pod)})))
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@
|
||||||
(:require [babashka.pods.impl :as impl]
|
(:require [babashka.pods.impl :as impl]
|
||||||
[sci.core :as sci]))
|
[sci.core :as sci]))
|
||||||
|
|
||||||
(defn- process-namespace [ctx {:keys [:name :vars :done]}]
|
(defn- process-namespace [ctx {:keys [:name :vars]}]
|
||||||
(let [env (:env ctx)
|
(let [env (:env ctx)
|
||||||
ns-name name
|
ns-name name
|
||||||
sci-ns (sci/create-ns ns-name)]
|
sci-ns (sci/create-ns ns-name)]
|
||||||
|
|
@ -13,8 +13,7 @@
|
||||||
(sci/new-var
|
(sci/new-var
|
||||||
(symbol (str ns-name) (str var-name)) var-value))
|
(symbol (str ns-name) (str var-name)) var-value))
|
||||||
(string? var-value)
|
(string? var-value)
|
||||||
(sci/eval-string* ctx var-value)))))
|
(sci/eval-string* ctx var-value))))))
|
||||||
(when done (deliver done :ok)))
|
|
||||||
|
|
||||||
(def load-pod
|
(def load-pod
|
||||||
(with-meta
|
(with-meta
|
||||||
|
|
@ -43,26 +42,25 @@
|
||||||
namespaces (:namespaces pod)
|
namespaces (:namespaces pod)
|
||||||
load? (contains? (:ops pod) :load-ns)
|
load? (contains? (:ops pod) :load-ns)
|
||||||
namespaces-to-load (when load?
|
namespaces-to-load (when load?
|
||||||
(set (keep (fn [[ns-name vars]]
|
(set (keep (fn [[ns-name _ lazy?]]
|
||||||
(when (empty? vars)
|
(when lazy?
|
||||||
ns-name))
|
ns-name))
|
||||||
namespaces)))]
|
namespaces)))]
|
||||||
(when (seq namespaces-to-load)
|
(when (seq namespaces-to-load)
|
||||||
(let [load-fn (fn load-fn [{:keys [:namespace]}]
|
(let [load-fn (fn load-fn [{:keys [:namespace]}]
|
||||||
(when (contains? namespaces-to-load namespace)
|
(when (contains? namespaces-to-load namespace)
|
||||||
(impl/load-ns
|
(let [ns (impl/load-ns pod namespace)]
|
||||||
pod namespace (fn [namespace]
|
(process-namespace ctx ns))
|
||||||
(process-namespace ctx namespace)))
|
{:file nil
|
||||||
""))
|
:source ""}))
|
||||||
prev-load-fn (:load-fn @env)
|
prev-load-fn (:load-fn @env)
|
||||||
new-load-fn (fn [m]
|
new-load-fn (fn [m]
|
||||||
(or (load-fn m)
|
(or (load-fn m)
|
||||||
(when prev-load-fn
|
(when prev-load-fn
|
||||||
(prev-load-fn m))))]
|
(prev-load-fn m))))]
|
||||||
(swap! env assoc :load-fn new-load-fn)))
|
(swap! env assoc :load-fn new-load-fn)))
|
||||||
(doseq [[ns-name vars] namespaces
|
(doseq [[ns-name vars lazy?] namespaces
|
||||||
:when (or (not load)
|
:when (not lazy?)]
|
||||||
(seq vars))]
|
|
||||||
(process-namespace ctx {:name ns-name :vars vars}))
|
(process-namespace ctx {:name ns-name :vars vars}))
|
||||||
(sci/future (impl/processor pod))
|
(sci/future (impl/processor pod))
|
||||||
{:pod/id (:pod-id pod)})))
|
{:pod/id (:pod-id pod)})))
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,10 @@
|
||||||
{"name" "read-other-tag"
|
{"name" "read-other-tag"
|
||||||
"code" "(defn read-other-tag [x] [x x])"}]
|
"code" "(defn read-other-tag [x] [x x])"}]
|
||||||
dependents)}
|
dependents)}
|
||||||
{"name" "pod.test-pod.loaded"}]
|
{"name" "pod.test-pod.loaded"
|
||||||
|
"lazy" "true"}
|
||||||
|
{"name" "pod.test-pod.loaded2"
|
||||||
|
"lazy" "true"}]
|
||||||
"ops" {"shutdown" {}
|
"ops" {"shutdown" {}
|
||||||
"load-ns" {}}})
|
"load-ns" {}}})
|
||||||
(recur))
|
(recur))
|
||||||
|
|
@ -162,16 +165,26 @@
|
||||||
:load-ns (let [ns (-> (get message "ns")
|
:load-ns (let [ns (-> (get message "ns")
|
||||||
read-string
|
read-string
|
||||||
symbol)
|
symbol)
|
||||||
id (-> (get message "id")
|
id (-> (get message "id")
|
||||||
read-string)]
|
read-string)]
|
||||||
(case ns
|
(case ns
|
||||||
pod.test-pod.loaded
|
pod.test-pod.loaded
|
||||||
(write
|
(write
|
||||||
{"status" ["done"]
|
{"status" ["done"]
|
||||||
"id" id
|
"id" id
|
||||||
"name" "pod.test-pod.loaded"
|
"name" "pod.test-pod.loaded"
|
||||||
"vars" [{"name" "loaded"
|
"vars" [{"name" "loaded"
|
||||||
"code" "(defn loaded [x] (inc x))"}]}))))))))
|
"code" "(defn loaded [x] (inc x))"}]})
|
||||||
|
pod.test-pod.loaded2
|
||||||
|
(write
|
||||||
|
{"status" ["done"]
|
||||||
|
"id" id
|
||||||
|
"name" "pod.test-pod.loaded2"
|
||||||
|
"vars" [{"name" "x"
|
||||||
|
"code" "(require '[pod.test-pod.loaded :as loaded])"}
|
||||||
|
{"name" "loaded"
|
||||||
|
"code" "(defn loaded [x] (loaded/loaded x))"}]}))
|
||||||
|
(recur)))))))
|
||||||
(catch Exception e
|
(catch Exception e
|
||||||
(binding [*out* *err*]
|
(binding [*out* *err*]
|
||||||
(prn e))))))
|
(prn e))))))
|
||||||
|
|
|
||||||
|
|
@ -46,8 +46,8 @@
|
||||||
(def tagged (pod/reader-tag))
|
(def tagged (pod/reader-tag))
|
||||||
(def other-tagged (pod/other-tag))
|
(def other-tagged (pod/other-tag))
|
||||||
|
|
||||||
(require '[pod.test-pod.loaded])
|
(require '[pod.test-pod.loaded2 :as loaded2])
|
||||||
(def loaded (pod.test-pod.loaded/loaded 1))
|
(def loaded (loaded2/loaded 1))
|
||||||
|
|
||||||
(pods/unload-pod pod-id)
|
(pods/unload-pod pod-id)
|
||||||
(def successfully-removed (nil? (find-ns 'pod.test-pod)))
|
(def successfully-removed (nil? (find-ns 'pod.test-pod)))
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue