[#2] Experimental socket support

This commit is contained in:
Michiel Borkent 2020-10-13 22:34:01 +02:00
parent 7f804baca8
commit 225f9239d8
3 changed files with 49 additions and 42 deletions

View file

@ -219,26 +219,38 @@
[^Socket socket] [^Socket socket]
(.close socket)) (.close socket))
(defn gobbler [^java.io.InputStream is] (defn read-port [pid]
(future 1888 #_(loop []
(loop [] (let [f (io/file (str ".babashka/pods/" pid ".port"))]
(let [v (.read is)] (if (.exists f)
(when-not (= -1 v) (edn/read-string (slurp f))
(print (char v)) (recur)))))
(recur))))))
(defn load-pod (defn load-pod
([pod-spec] (load-pod pod-spec nil)) ([pod-spec] (load-pod pod-spec nil))
([pod-spec {:keys [:remove-ns :resolve]}] ([pod-spec {:keys [:remove-ns :resolve :socket :inherit-io]}]
(let [pod-spec (if (string? pod-spec) [pod-spec] pod-spec) (let [pod-spec (if (string? pod-spec) [pod-spec] pod-spec)
pb (ProcessBuilder. ^java.util.List pod-spec) pb (ProcessBuilder. ^java.util.List pod-spec)
_ (.redirectError pb java.lang.ProcessBuilder$Redirect/INHERIT) _ (if inherit-io
(.inheritIO pb)
(.redirectError pb java.lang.ProcessBuilder$Redirect/INHERIT))
_ (doto (.environment pb) _ (doto (.environment pb)
(.put "BABASHKA_POD" "true")) (.put "BABASHKA_POD" "true"))
p (.start pb) p (.start pb)
stdin (.getOutputStream p) pid (.pid p)
stdout (.getInputStream p) socket-port (when socket (read-port pid))
stdout (java.io.PushbackInputStream. stdout) [stdin stdout]
(if socket
(let [^Socket socket
(loop []
(if-let [sock (try (create-socket "localhost" socket-port)
(catch java.net.ConnectException _
nil))]
sock
(recur)))]
[(.getOutputStream socket)
(PushbackInputStream. (.getInputStream socket))])
[(.getOutputStream p) (java.io.PushbackInputStream. (.getInputStream p))])
_ (write stdin {"op" "describe" _ (write stdin {"op" "describe"
"id" (next-id)}) "id" (next-id)})
reply (read stdout) reply (read stdout)
@ -246,14 +258,6 @@
ops (some->> (get reply "ops") keys (map keyword) set) ops (some->> (get reply "ops") keys (map keyword) set)
readers (when (identical? :edn format) readers (when (identical? :edn format)
(read-readers reply resolve)) (read-readers reply resolve))
socket-port (get reply "port")
[stdin stdout _gobbler]
(if socket-port
(let [socket (create-socket "localhost" socket-port)]
[(.getOutputStream socket)
(PushbackInputStream. (.getInputStream socket))
(gobbler stdout)])
[stdin stdout])
pod {:process p pod {:process p
:pod-spec pod-spec :pod-spec pod-spec
:stdin stdin :stdin stdin

View file

@ -33,15 +33,16 @@
(defn load-pod (defn load-pod
([pod-spec] (load-pod pod-spec nil)) ([pod-spec] (load-pod pod-spec nil))
([pod-spec _opts] ([pod-spec opts]
(let [pod (impl/load-pod (let [pod (impl/load-pod
pod-spec pod-spec
{:remove-ns remove-ns (merge {:remove-ns remove-ns
:resolve (fn [sym] :resolve (fn [sym]
(or (resolve sym) (or (resolve sym)
(intern (intern
(create-ns (symbol (namespace sym))) (create-ns (symbol (namespace sym)))
(symbol (name sym)))))}) (symbol (name sym)))))}
opts))
namespaces (:namespaces pod)] namespaces (:namespaces pod)]
(swap! namespaces-to-load (swap! namespaces-to-load
merge merge

View file

@ -23,12 +23,13 @@
(with-meta (with-meta
(fn (fn
([ctx pod-spec] (load-pod ctx pod-spec nil)) ([ctx pod-spec] (load-pod ctx pod-spec nil))
([ctx pod-spec _opts] ([ctx pod-spec opts]
(let [env (:env ctx) (let [env (:env ctx)
pod (binding [*out* @sci/out pod (binding [*out* @sci/out
*err* @sci/err] *err* @sci/err]
(impl/load-pod (impl/load-pod
pod-spec pod-spec
(merge
{:remove-ns {:remove-ns
(fn [sym] (fn [sym]
(swap! env update :namespaces dissoc sym)) (swap! env update :namespaces dissoc sym))
@ -42,7 +43,8 @@
(let [v (sci/new-var sym {:predefined true})] (let [v (sci/new-var sym {:predefined true})]
(swap! env assoc-in [:namespaces sym-ns sym-name] (swap! env assoc-in [:namespaces sym-ns sym-name]
v) v)
v))))})) v))))}
opts)))
namespaces (:namespaces pod) namespaces (:namespaces pod)
namespaces-to-load (set (keep (fn [[ns-name _ defer?]] namespaces-to-load (set (keep (fn [[ns-name _ defer?]]
(when defer? (when defer?