[#3] unload-pod
This commit is contained in:
parent
990d804199
commit
3e5637b33f
8 changed files with 165 additions and 118 deletions
|
|
@ -5,5 +5,9 @@
|
|||
([pod-spec] (load-pod pod-spec nil))
|
||||
([pod-spec opts] (jvm/load-pod pod-spec opts)))
|
||||
|
||||
(defn unload-pod
|
||||
([pod-id] (unload-pod pod-id {}))
|
||||
([pod-id opts] (jvm/unload-pod pod-id opts)))
|
||||
|
||||
(defn invoke [pod-id sym args opts]
|
||||
(jvm/invoke pod-id sym args opts))
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@
|
|||
{error-handler :error
|
||||
done-handler :done
|
||||
success-handler :success} (when (map? chan)
|
||||
chan)
|
||||
chan)
|
||||
out (some-> (get reply "out")
|
||||
bytes->string)
|
||||
err (some-> (get reply "err")
|
||||
|
|
@ -128,9 +128,24 @@
|
|||
|
||||
(def pods (atom {}))
|
||||
|
||||
(defn lookup-pod [pod-id]
|
||||
(get @pods pod-id))
|
||||
|
||||
(defn destroy [pod-id]
|
||||
(when-let [pod (lookup-pod pod-id)]
|
||||
(if (contains? (:ops pod) :shutdown)
|
||||
(do (write (:stdin pod)
|
||||
{"op" "shutdown"
|
||||
"id" (next-id)})
|
||||
(.waitFor ^Process (:process pod)))
|
||||
(.destroy ^Process (:process pod)))
|
||||
(when-let [rns (:remove-ns pod)]
|
||||
(doseq [[ns-name _] (:namespaces pod)]
|
||||
(rns ns-name)))))
|
||||
|
||||
(defn load-pod
|
||||
([pod-spec] (load-pod pod-spec nil))
|
||||
([pod-spec _opts]
|
||||
([pod-spec {:keys [:remove-ns]}]
|
||||
(let [pod-spec (if (string? pod-spec) [pod-spec] pod-spec)
|
||||
pb (ProcessBuilder. ^java.util.List pod-spec)
|
||||
_ (.redirectError pb java.lang.ProcessBuilder$Redirect/INHERIT)
|
||||
|
|
@ -154,14 +169,9 @@
|
|||
:format format
|
||||
:ops ops
|
||||
:out *out*
|
||||
:err *err*}
|
||||
_ (add-shutdown-hook!
|
||||
(fn []
|
||||
(if (contains? ops :shutdown)
|
||||
(do (write stdin {"op" "shutdown"
|
||||
"id" (next-id)})
|
||||
(.waitFor p))
|
||||
(.destroy p))))
|
||||
:err *err*
|
||||
:remove-ns remove-ns}
|
||||
_ (add-shutdown-hook! #(destroy pod))
|
||||
pod-namespaces (get reply "namespaces")
|
||||
pod-id (or pod-id (when-let [ns (first pod-namespaces)]
|
||||
(get-string ns "name")))
|
||||
|
|
@ -195,9 +205,10 @@
|
|||
(swap! pods assoc pod-id pod)
|
||||
pod)))
|
||||
|
||||
(defn lookup-pod [pod-id]
|
||||
(get @pods pod-id))
|
||||
|
||||
(defn invoke-public [pod-id fn-sym args opts]
|
||||
(let [pod (lookup-pod pod-id)]
|
||||
{:result (invoke pod fn-sym args opts)}))
|
||||
(invoke pod fn-sym args opts)
|
||||
nil))
|
||||
|
||||
(defn unload-pod [pod-id]
|
||||
(destroy pod-id))
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
(defn load-pod
|
||||
([pod-spec] (load-pod pod-spec nil))
|
||||
([pod-spec _opts]
|
||||
(let [pod (impl/load-pod pod-spec _opts)
|
||||
(let [pod (impl/load-pod pod-spec {:remove-ns remove-ns})
|
||||
namespaces (:namespaces pod)]
|
||||
(doseq [[ns-sym v] namespaces]
|
||||
(binding [*ns* (load-string (format "(ns %s) *ns*" ns-sym))]
|
||||
|
|
@ -17,7 +17,12 @@
|
|||
(string? v)
|
||||
(load-string v)))))
|
||||
(future (impl/processor pod))
|
||||
nil)))
|
||||
(:pod-id pod))))
|
||||
|
||||
(defn unload-pod
|
||||
([pod-id] (unload-pod pod-id {}))
|
||||
([pod-id _opts]
|
||||
(impl/unload-pod pod-id)))
|
||||
|
||||
(defn invoke [pod-id sym args opts]
|
||||
(impl/invoke-public pod-id sym args opts))
|
||||
|
|
|
|||
|
|
@ -7,11 +7,14 @@
|
|||
(fn
|
||||
([ctx pod-spec] (load-pod ctx pod-spec nil))
|
||||
([ctx pod-spec _opts]
|
||||
(let [pod (binding [*out* @sci/out
|
||||
(let [env (:env ctx)
|
||||
pod (binding [*out* @sci/out
|
||||
*err* @sci/err]
|
||||
(impl/load-pod pod-spec _opts))
|
||||
namespaces (:namespaces pod)
|
||||
env (:env ctx)]
|
||||
(impl/load-pod pod-spec
|
||||
{:remove-ns
|
||||
(fn [sym]
|
||||
(swap! env update :namespaces dissoc sym))}))
|
||||
namespaces (:namespaces pod)]
|
||||
(doseq [[ns-name vars] namespaces
|
||||
:let [sci-ns (sci/create-ns ns-name)]]
|
||||
(sci/binding [sci/ns sci-ns]
|
||||
|
|
@ -21,8 +24,13 @@
|
|||
(string? var-value)
|
||||
(sci/eval-string* ctx var-value)))))
|
||||
(sci/future (impl/processor pod))
|
||||
nil)))
|
||||
(:pod-id pod))))
|
||||
{:sci.impl/op :needs-ctx}))
|
||||
|
||||
(defn unload-pod
|
||||
([pod-id] (unload-pod pod-id {}))
|
||||
([pod-id _opts]
|
||||
(impl/unload-pod pod-id)))
|
||||
|
||||
(defn invoke [pod-id sym args opts]
|
||||
(impl/invoke-public pod-id sym args opts))
|
||||
|
|
|
|||
|
|
@ -36,97 +36,101 @@
|
|||
read-fn (if (identical? :json format)
|
||||
#(cheshire/parse-string % true)
|
||||
edn/read-string)]
|
||||
(loop []
|
||||
(let [message (try (read)
|
||||
(catch java.io.EOFException _
|
||||
::EOF))]
|
||||
(when-not (identical? ::EOF message)
|
||||
(let [op (get message "op")
|
||||
op (read-string op)
|
||||
op (keyword op)]
|
||||
(case op
|
||||
:describe
|
||||
(do (write {"format" (if (= format :json)
|
||||
"json"
|
||||
"edn")
|
||||
"namespaces"
|
||||
[{"name" "pod.test-pod"
|
||||
"vars" [{"name" "add-sync"}
|
||||
{"name" "range-stream"
|
||||
"async" "true"}
|
||||
{"name" "assoc"}
|
||||
{"name" "error"}
|
||||
{"name" "print"}
|
||||
{"name" "print-err"}
|
||||
{"name" "return-nil"}
|
||||
{"name" "do-twice"
|
||||
"code" "(defmacro do-twice [x] `(do ~x ~x))"}]}]
|
||||
"ops" {"shutdown" {}}})
|
||||
(recur))
|
||||
:invoke (let [var (-> (get message "var")
|
||||
read-string
|
||||
symbol)
|
||||
_ (debug "var" var)
|
||||
id (-> (get message "id")
|
||||
read-string)
|
||||
args (get message "args")
|
||||
args (read-string args)
|
||||
args (read-fn args)]
|
||||
(case var
|
||||
pod.test-pod/add-sync
|
||||
(try (let [ret (apply + args)]
|
||||
(write
|
||||
{"value" (write-fn ret)
|
||||
"id" id
|
||||
"status" ["done"]}))
|
||||
(catch Exception e
|
||||
(write
|
||||
{"ex-data" (write-fn {:args args})
|
||||
"ex-message" (.getMessage e)
|
||||
"status" ["done" "error"]
|
||||
"id" id})))
|
||||
pod.test-pod/range-stream
|
||||
(let [rng (apply range args)]
|
||||
(doseq [v rng]
|
||||
(try
|
||||
(loop []
|
||||
(let [message (try (read)
|
||||
(catch java.io.EOFException _
|
||||
::EOF))]
|
||||
(when-not (identical? ::EOF message)
|
||||
(let [op (get message "op")
|
||||
op (read-string op)
|
||||
op (keyword op)]
|
||||
(case op
|
||||
:describe
|
||||
(do (write {"format" (if (= format :json)
|
||||
"json"
|
||||
"edn")
|
||||
"namespaces"
|
||||
[{"name" "pod.test-pod"
|
||||
"vars" [{"name" "add-sync"}
|
||||
{"name" "range-stream"
|
||||
"async" "true"}
|
||||
{"name" "assoc"}
|
||||
{"name" "error"}
|
||||
{"name" "print"}
|
||||
{"name" "print-err"}
|
||||
{"name" "return-nil"}
|
||||
{"name" "do-twice"
|
||||
"code" "(defmacro do-twice [x] `(do ~x ~x))"}]}]
|
||||
"ops" {"shutdown" {}}})
|
||||
(recur))
|
||||
:invoke (let [var (-> (get message "var")
|
||||
read-string
|
||||
symbol)
|
||||
_ (debug "var" var)
|
||||
id (-> (get message "id")
|
||||
read-string)
|
||||
args (get message "args")
|
||||
args (read-string args)
|
||||
args (read-fn args)]
|
||||
(case var
|
||||
pod.test-pod/add-sync
|
||||
(try (let [ret (apply + args)]
|
||||
(write
|
||||
{"value" (write-fn ret)
|
||||
"id" id
|
||||
"status" ["done"]}))
|
||||
(catch Exception e
|
||||
(write
|
||||
{"ex-data" (write-fn {:args args})
|
||||
"ex-message" (.getMessage e)
|
||||
"status" ["done" "error"]
|
||||
"id" id})))
|
||||
pod.test-pod/range-stream
|
||||
(let [rng (apply range args)]
|
||||
(doseq [v rng]
|
||||
(write
|
||||
{"value" (write-fn v)
|
||||
"id" id})
|
||||
(Thread/sleep 100))
|
||||
(write
|
||||
{"value" (write-fn v)
|
||||
"id" id})
|
||||
(Thread/sleep 100))
|
||||
{"status" ["done"]
|
||||
"id" id}))
|
||||
pod.test-pod/assoc
|
||||
(write
|
||||
{"value" (write-fn (apply assoc args))
|
||||
"status" ["done"]
|
||||
"id" id})
|
||||
pod.test-pod/error
|
||||
(write
|
||||
{"ex-data" (write-fn {:args args})
|
||||
"ex-message" (str "Illegal arguments")
|
||||
"status" ["done" "error"]
|
||||
"id" id})
|
||||
pod.test-pod/print
|
||||
(do (write
|
||||
{"out" (pr-str args)
|
||||
"id" id})
|
||||
(write
|
||||
{"status" ["done"]
|
||||
"id" id}))
|
||||
pod.test-pod/print-err
|
||||
(do (write
|
||||
{"err" (pr-str args)
|
||||
"id" id})
|
||||
(write
|
||||
{"status" ["done"]
|
||||
"id" id}))
|
||||
pod.test-pod/return-nil
|
||||
(write
|
||||
{"status" ["done"]
|
||||
"id" id}))
|
||||
pod.test-pod/assoc
|
||||
(write
|
||||
{"value" (write-fn (apply assoc args))
|
||||
"status" ["done"]
|
||||
"id" id})
|
||||
pod.test-pod/error
|
||||
(write
|
||||
{"ex-data" (write-fn {:args args})
|
||||
"ex-message" (str "Illegal arguments")
|
||||
"status" ["done" "error"]
|
||||
"id" id})
|
||||
pod.test-pod/print
|
||||
(do (write
|
||||
{"out" (pr-str args)
|
||||
"id" id})
|
||||
(write
|
||||
{"status" ["done"]
|
||||
"id" id}))
|
||||
pod.test-pod/print-err
|
||||
(do (write
|
||||
{"err" (pr-str args)
|
||||
"id" id})
|
||||
(write
|
||||
{"status" ["done"]
|
||||
"id" id}))
|
||||
pod.test-pod/return-nil
|
||||
(write
|
||||
{"status" ["done"]
|
||||
"id" id
|
||||
"value" "nil"}))
|
||||
(recur))
|
||||
:shutdown (System/exit 0))))))))
|
||||
"id" id
|
||||
"value" "nil"}))
|
||||
(recur))
|
||||
:shutdown (System/exit 0))))))
|
||||
(catch Exception e
|
||||
(binding [*out* *err*]
|
||||
(prn e))))))
|
||||
|
||||
(defn -main [& args]
|
||||
(when (= "true" (System/getenv "BABASHKA_POD"))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
(require '[babashka.pods :as pods])
|
||||
(prn (pods/load-pod ["clojure" "-A:test-pod"])) ;; should return nil
|
||||
(def pod-id (pods/load-pod ["clojure" "-A:test-pod"]))
|
||||
(require '[pod.test-pod :as pod])
|
||||
(def pod-ns-name (ns-name (find-ns 'pod.test-pod)))
|
||||
|
||||
(def stream-results (atom []))
|
||||
(def done-prom (promise))
|
||||
|
|
@ -33,11 +34,21 @@
|
|||
{:error (fn [m]
|
||||
(deliver error-result m))}})
|
||||
|
||||
[(pod/assoc {:a 1} :b 2)
|
||||
(pod.test-pod/add-sync 1 2 3)
|
||||
(def assoc-result (pod/assoc {:a 1} :b 2))
|
||||
(def add-result (pod.test-pod/add-sync 1 2 3))
|
||||
(def nil-result (pod.test-pod/return-nil))
|
||||
|
||||
(pods/unload-pod pod-id)
|
||||
(def successfully-removed (nil? (find-ns 'pod.test-pod)))
|
||||
|
||||
[pod-id
|
||||
pod-ns-name
|
||||
assoc-result
|
||||
add-result
|
||||
@stream-results
|
||||
ex-result
|
||||
(pod.test-pod/return-nil)
|
||||
nil-result
|
||||
@callback-result
|
||||
(:ex-message @error-result)
|
||||
(:ex-data @error-result)]
|
||||
(:ex-data @error-result)
|
||||
successfully-removed]
|
||||
|
|
|
|||
|
|
@ -13,5 +13,6 @@
|
|||
test-program
|
||||
{:namespaces {'babashka.pods
|
||||
{'load-pod pods/load-pod
|
||||
'invoke pods/invoke}}}))]
|
||||
'invoke pods/invoke
|
||||
'unload-pod pods/unload-pod}}}))]
|
||||
(assertions out err ret)))
|
||||
|
|
|
|||
|
|
@ -5,13 +5,16 @@
|
|||
(def test-program (slurp (io/file "test-resources" "test_program.clj")))
|
||||
|
||||
(defn assertions [out err ret]
|
||||
(is (= '[{:a 1, :b 2}
|
||||
(is (= '["pod.test-pod"
|
||||
pod.test-pod
|
||||
{:a 1, :b 2}
|
||||
6
|
||||
[1 2 3 4 5 6 7 8 9]
|
||||
"Illegal arguments / {:args (1 2 3)}"
|
||||
nil
|
||||
3
|
||||
"java.lang.String cannot be cast to java.lang.Number"
|
||||
{:args ["1" 2]}] ret))
|
||||
(is (= "nil\n(\"hello\" \"print\" \"this\" \"debugging\" \"message\")\n:foo\n:foo\n" (str out)))
|
||||
{:args ["1" 2]}
|
||||
true] ret))
|
||||
(is (= "(\"hello\" \"print\" \"this\" \"debugging\" \"message\")\n:foo\n:foo\n" (str out)))
|
||||
(is (= "(\"hello\" \"print\" \"this\" \"error\")\n" (str err))))
|
||||
|
|
|
|||
Loading…
Reference in a new issue