From 3e5637b33fa22aa4a50c22d078e2031b34e8bd5a Mon Sep 17 00:00:00 2001 From: Michiel Borkent Date: Wed, 20 May 2020 20:11:46 +0200 Subject: [PATCH] [#3] unload-pod --- src/babashka/pods.clj | 4 + src/babashka/pods/impl.clj | 39 ++++--- src/babashka/pods/jvm.clj | 9 +- src/babashka/pods/sci.clj | 18 ++- test-pod/pod/test_pod.clj | 180 +++++++++++++++-------------- test-resources/test_program.clj | 21 +++- test/babashka/pods/sci_test.clj | 3 +- test/babashka/pods/test_common.clj | 9 +- 8 files changed, 165 insertions(+), 118 deletions(-) diff --git a/src/babashka/pods.clj b/src/babashka/pods.clj index a99b928..2450acf 100644 --- a/src/babashka/pods.clj +++ b/src/babashka/pods.clj @@ -5,5 +5,9 @@ ([pod-spec] (load-pod pod-spec nil)) ([pod-spec opts] (jvm/load-pod pod-spec opts))) +(defn unload-pod + ([pod-id] (unload-pod pod-id {})) + ([pod-id opts] (jvm/unload-pod pod-id opts))) + (defn invoke [pod-id sym args opts] (jvm/invoke pod-id sym args opts)) diff --git a/src/babashka/pods/impl.clj b/src/babashka/pods/impl.clj index 34e93f6..57944a5 100644 --- a/src/babashka/pods/impl.clj +++ b/src/babashka/pods/impl.clj @@ -72,7 +72,7 @@ {error-handler :error done-handler :done success-handler :success} (when (map? chan) - chan) + chan) out (some-> (get reply "out") bytes->string) err (some-> (get reply "err") @@ -128,9 +128,24 @@ (def pods (atom {})) +(defn lookup-pod [pod-id] + (get @pods pod-id)) + +(defn destroy [pod-id] + (when-let [pod (lookup-pod pod-id)] + (if (contains? (:ops pod) :shutdown) + (do (write (:stdin pod) + {"op" "shutdown" + "id" (next-id)}) + (.waitFor ^Process (:process pod))) + (.destroy ^Process (:process pod))) + (when-let [rns (:remove-ns pod)] + (doseq [[ns-name _] (:namespaces pod)] + (rns ns-name))))) + (defn load-pod ([pod-spec] (load-pod pod-spec nil)) - ([pod-spec _opts] + ([pod-spec {:keys [:remove-ns]}] (let [pod-spec (if (string? pod-spec) [pod-spec] pod-spec) pb (ProcessBuilder. ^java.util.List pod-spec) _ (.redirectError pb java.lang.ProcessBuilder$Redirect/INHERIT) @@ -154,14 +169,9 @@ :format format :ops ops :out *out* - :err *err*} - _ (add-shutdown-hook! - (fn [] - (if (contains? ops :shutdown) - (do (write stdin {"op" "shutdown" - "id" (next-id)}) - (.waitFor p)) - (.destroy p)))) + :err *err* + :remove-ns remove-ns} + _ (add-shutdown-hook! #(destroy pod)) pod-namespaces (get reply "namespaces") pod-id (or pod-id (when-let [ns (first pod-namespaces)] (get-string ns "name"))) @@ -195,9 +205,10 @@ (swap! pods assoc pod-id pod) pod))) -(defn lookup-pod [pod-id] - (get @pods pod-id)) - (defn invoke-public [pod-id fn-sym args opts] (let [pod (lookup-pod pod-id)] - {:result (invoke pod fn-sym args opts)})) + (invoke pod fn-sym args opts) + nil)) + +(defn unload-pod [pod-id] + (destroy pod-id)) diff --git a/src/babashka/pods/jvm.clj b/src/babashka/pods/jvm.clj index e37ad6a..d9bb443 100644 --- a/src/babashka/pods/jvm.clj +++ b/src/babashka/pods/jvm.clj @@ -4,7 +4,7 @@ (defn load-pod ([pod-spec] (load-pod pod-spec nil)) ([pod-spec _opts] - (let [pod (impl/load-pod pod-spec _opts) + (let [pod (impl/load-pod pod-spec {:remove-ns remove-ns}) namespaces (:namespaces pod)] (doseq [[ns-sym v] namespaces] (binding [*ns* (load-string (format "(ns %s) *ns*" ns-sym))] @@ -17,7 +17,12 @@ (string? v) (load-string v))))) (future (impl/processor pod)) - nil))) + (:pod-id pod)))) + +(defn unload-pod + ([pod-id] (unload-pod pod-id {})) + ([pod-id _opts] + (impl/unload-pod pod-id))) (defn invoke [pod-id sym args opts] (impl/invoke-public pod-id sym args opts)) diff --git a/src/babashka/pods/sci.clj b/src/babashka/pods/sci.clj index 91fad18..3f7dced 100644 --- a/src/babashka/pods/sci.clj +++ b/src/babashka/pods/sci.clj @@ -7,11 +7,14 @@ (fn ([ctx pod-spec] (load-pod ctx pod-spec nil)) ([ctx pod-spec _opts] - (let [pod (binding [*out* @sci/out + (let [env (:env ctx) + pod (binding [*out* @sci/out *err* @sci/err] - (impl/load-pod pod-spec _opts)) - namespaces (:namespaces pod) - env (:env ctx)] + (impl/load-pod pod-spec + {:remove-ns + (fn [sym] + (swap! env update :namespaces dissoc sym))})) + namespaces (:namespaces pod)] (doseq [[ns-name vars] namespaces :let [sci-ns (sci/create-ns ns-name)]] (sci/binding [sci/ns sci-ns] @@ -21,8 +24,13 @@ (string? var-value) (sci/eval-string* ctx var-value))))) (sci/future (impl/processor pod)) - nil))) + (:pod-id pod)))) {:sci.impl/op :needs-ctx})) +(defn unload-pod + ([pod-id] (unload-pod pod-id {})) + ([pod-id _opts] + (impl/unload-pod pod-id))) + (defn invoke [pod-id sym args opts] (impl/invoke-public pod-id sym args opts)) diff --git a/test-pod/pod/test_pod.clj b/test-pod/pod/test_pod.clj index 3913fb6..5bbb93f 100644 --- a/test-pod/pod/test_pod.clj +++ b/test-pod/pod/test_pod.clj @@ -36,97 +36,101 @@ read-fn (if (identical? :json format) #(cheshire/parse-string % true) edn/read-string)] - (loop [] - (let [message (try (read) - (catch java.io.EOFException _ - ::EOF))] - (when-not (identical? ::EOF message) - (let [op (get message "op") - op (read-string op) - op (keyword op)] - (case op - :describe - (do (write {"format" (if (= format :json) - "json" - "edn") - "namespaces" - [{"name" "pod.test-pod" - "vars" [{"name" "add-sync"} - {"name" "range-stream" - "async" "true"} - {"name" "assoc"} - {"name" "error"} - {"name" "print"} - {"name" "print-err"} - {"name" "return-nil"} - {"name" "do-twice" - "code" "(defmacro do-twice [x] `(do ~x ~x))"}]}] - "ops" {"shutdown" {}}}) - (recur)) - :invoke (let [var (-> (get message "var") - read-string - symbol) - _ (debug "var" var) - id (-> (get message "id") - read-string) - args (get message "args") - args (read-string args) - args (read-fn args)] - (case var - pod.test-pod/add-sync - (try (let [ret (apply + args)] - (write - {"value" (write-fn ret) - "id" id - "status" ["done"]})) - (catch Exception e - (write - {"ex-data" (write-fn {:args args}) - "ex-message" (.getMessage e) - "status" ["done" "error"] - "id" id}))) - pod.test-pod/range-stream - (let [rng (apply range args)] - (doseq [v rng] + (try + (loop [] + (let [message (try (read) + (catch java.io.EOFException _ + ::EOF))] + (when-not (identical? ::EOF message) + (let [op (get message "op") + op (read-string op) + op (keyword op)] + (case op + :describe + (do (write {"format" (if (= format :json) + "json" + "edn") + "namespaces" + [{"name" "pod.test-pod" + "vars" [{"name" "add-sync"} + {"name" "range-stream" + "async" "true"} + {"name" "assoc"} + {"name" "error"} + {"name" "print"} + {"name" "print-err"} + {"name" "return-nil"} + {"name" "do-twice" + "code" "(defmacro do-twice [x] `(do ~x ~x))"}]}] + "ops" {"shutdown" {}}}) + (recur)) + :invoke (let [var (-> (get message "var") + read-string + symbol) + _ (debug "var" var) + id (-> (get message "id") + read-string) + args (get message "args") + args (read-string args) + args (read-fn args)] + (case var + pod.test-pod/add-sync + (try (let [ret (apply + args)] + (write + {"value" (write-fn ret) + "id" id + "status" ["done"]})) + (catch Exception e + (write + {"ex-data" (write-fn {:args args}) + "ex-message" (.getMessage e) + "status" ["done" "error"] + "id" id}))) + pod.test-pod/range-stream + (let [rng (apply range args)] + (doseq [v rng] + (write + {"value" (write-fn v) + "id" id}) + (Thread/sleep 100)) (write - {"value" (write-fn v) - "id" id}) - (Thread/sleep 100)) + {"status" ["done"] + "id" id})) + pod.test-pod/assoc + (write + {"value" (write-fn (apply assoc args)) + "status" ["done"] + "id" id}) + pod.test-pod/error + (write + {"ex-data" (write-fn {:args args}) + "ex-message" (str "Illegal arguments") + "status" ["done" "error"] + "id" id}) + pod.test-pod/print + (do (write + {"out" (pr-str args) + "id" id}) + (write + {"status" ["done"] + "id" id})) + pod.test-pod/print-err + (do (write + {"err" (pr-str args) + "id" id}) + (write + {"status" ["done"] + "id" id})) + pod.test-pod/return-nil (write {"status" ["done"] - "id" id})) - pod.test-pod/assoc - (write - {"value" (write-fn (apply assoc args)) - "status" ["done"] - "id" id}) - pod.test-pod/error - (write - {"ex-data" (write-fn {:args args}) - "ex-message" (str "Illegal arguments") - "status" ["done" "error"] - "id" id}) - pod.test-pod/print - (do (write - {"out" (pr-str args) - "id" id}) - (write - {"status" ["done"] - "id" id})) - pod.test-pod/print-err - (do (write - {"err" (pr-str args) - "id" id}) - (write - {"status" ["done"] - "id" id})) - pod.test-pod/return-nil - (write - {"status" ["done"] - "id" id - "value" "nil"})) - (recur)) - :shutdown (System/exit 0)))))))) + "id" id + "value" "nil"})) + (recur)) + :shutdown (System/exit 0)))))) + (catch Exception e + (binding [*out* *err*] + (prn e)))))) (defn -main [& args] (when (= "true" (System/getenv "BABASHKA_POD")) diff --git a/test-resources/test_program.clj b/test-resources/test_program.clj index c6ac5e6..3468501 100644 --- a/test-resources/test_program.clj +++ b/test-resources/test_program.clj @@ -1,6 +1,7 @@ (require '[babashka.pods :as pods]) -(prn (pods/load-pod ["clojure" "-A:test-pod"])) ;; should return nil +(def pod-id (pods/load-pod ["clojure" "-A:test-pod"])) (require '[pod.test-pod :as pod]) +(def pod-ns-name (ns-name (find-ns 'pod.test-pod))) (def stream-results (atom [])) (def done-prom (promise)) @@ -33,11 +34,21 @@ {:error (fn [m] (deliver error-result m))}}) -[(pod/assoc {:a 1} :b 2) - (pod.test-pod/add-sync 1 2 3) +(def assoc-result (pod/assoc {:a 1} :b 2)) +(def add-result (pod.test-pod/add-sync 1 2 3)) +(def nil-result (pod.test-pod/return-nil)) + +(pods/unload-pod pod-id) +(def successfully-removed (nil? (find-ns 'pod.test-pod))) + +[pod-id + pod-ns-name + assoc-result + add-result @stream-results ex-result - (pod.test-pod/return-nil) + nil-result @callback-result (:ex-message @error-result) - (:ex-data @error-result)] + (:ex-data @error-result) + successfully-removed] diff --git a/test/babashka/pods/sci_test.clj b/test/babashka/pods/sci_test.clj index f5ee07a..e7c4826 100644 --- a/test/babashka/pods/sci_test.clj +++ b/test/babashka/pods/sci_test.clj @@ -13,5 +13,6 @@ test-program {:namespaces {'babashka.pods {'load-pod pods/load-pod - 'invoke pods/invoke}}}))] + 'invoke pods/invoke + 'unload-pod pods/unload-pod}}}))] (assertions out err ret))) diff --git a/test/babashka/pods/test_common.clj b/test/babashka/pods/test_common.clj index a02ce26..1a9c928 100644 --- a/test/babashka/pods/test_common.clj +++ b/test/babashka/pods/test_common.clj @@ -5,13 +5,16 @@ (def test-program (slurp (io/file "test-resources" "test_program.clj"))) (defn assertions [out err ret] - (is (= '[{:a 1, :b 2} + (is (= '["pod.test-pod" + pod.test-pod + {:a 1, :b 2} 6 [1 2 3 4 5 6 7 8 9] "Illegal arguments / {:args (1 2 3)}" nil 3 "java.lang.String cannot be cast to java.lang.Number" - {:args ["1" 2]}] ret)) - (is (= "nil\n(\"hello\" \"print\" \"this\" \"debugging\" \"message\")\n:foo\n:foo\n" (str out))) + {:args ["1" 2]} + true] ret)) + (is (= "(\"hello\" \"print\" \"this\" \"debugging\" \"message\")\n:foo\n:foo\n" (str out))) (is (= "(\"hello\" \"print\" \"this\" \"error\")\n" (str err))))