Support declarative pods loaded on require
This commit is contained in:
parent
8d1d19d331
commit
f707705cf4
3 changed files with 127 additions and 50 deletions
|
|
@ -248,15 +248,18 @@
|
|||
(defn lookup-pod [pod-id]
|
||||
(get @pods pod-id))
|
||||
|
||||
(defn destroy* [{:keys [:stdin :process :ops]}]
|
||||
(if (contains? ops :shutdown)
|
||||
(do (write stdin
|
||||
{"op" "shutdown"
|
||||
"id" (next-id)})
|
||||
(.waitFor ^Process process))
|
||||
(.destroy ^Process process)))
|
||||
|
||||
(defn destroy [pod-id-or-pod]
|
||||
(let [pod-id (get-pod-id pod-id-or-pod)]
|
||||
(when-let [pod (lookup-pod pod-id)]
|
||||
(if (contains? (:ops pod) :shutdown)
|
||||
(do (write (:stdin pod)
|
||||
{"op" "shutdown"
|
||||
"id" (next-id)})
|
||||
(.waitFor ^Process (:process pod)))
|
||||
(.destroy ^Process (:process pod)))
|
||||
(destroy* pod)
|
||||
(when-let [rns (:remove-ns pod)]
|
||||
(doseq [[ns-name _] (:namespaces pod)]
|
||||
(rns ns-name))))
|
||||
|
|
@ -311,52 +314,86 @@
|
|||
(binding [*out* *err*]
|
||||
(println (str/join " " (map pr-str strs)))))
|
||||
|
||||
(defn resolve-pod [pod-spec {:keys [:version :force] :as opts}]
|
||||
(let [resolved (when (qualified-symbol? pod-spec)
|
||||
(resolver/resolve pod-spec version force))
|
||||
opts (if resolved
|
||||
(if-let [extra-opts (:options resolved)]
|
||||
(merge opts extra-opts)
|
||||
opts)
|
||||
opts)
|
||||
pod-spec (cond
|
||||
resolved [(:executable resolved)]
|
||||
(string? pod-spec) [pod-spec]
|
||||
:else pod-spec)]
|
||||
{:pod-spec pod-spec, :opts opts}))
|
||||
|
||||
(defn run-pod [pod-spec {:keys [:transport] :as _opts}]
|
||||
(let [pb (ProcessBuilder. ^java.util.List pod-spec)
|
||||
socket? (identical? :socket transport)
|
||||
_ (if socket?
|
||||
(.inheritIO pb)
|
||||
(.redirectError pb java.lang.ProcessBuilder$Redirect/INHERIT))
|
||||
_ (cond-> (doto (.environment pb)
|
||||
(.put "BABASHKA_POD" "true"))
|
||||
socket? (.put "BABASHKA_POD_TRANSPORT" "socket"))
|
||||
p (.start pb)
|
||||
port-file (when socket? (port-file (.pid p)))
|
||||
socket-port (when socket? (read-port port-file))
|
||||
[socket stdin stdout]
|
||||
(if socket?
|
||||
(let [^Socket socket
|
||||
(loop []
|
||||
(if-let [sock (try (create-socket "localhost" socket-port)
|
||||
(catch java.net.ConnectException _
|
||||
nil))]
|
||||
sock
|
||||
(recur)))]
|
||||
[socket
|
||||
(.getOutputStream socket)
|
||||
(PushbackInputStream. (.getInputStream socket))])
|
||||
[nil (.getOutputStream p) (java.io.PushbackInputStream. (.getInputStream p))])]
|
||||
{:process p
|
||||
:socket socket
|
||||
:stdin stdin
|
||||
:stdout stdout}))
|
||||
|
||||
(defn describe-pod [{:keys [:stdin :stdout]}]
|
||||
(write stdin {"op" "describe"
|
||||
"id" (next-id)})
|
||||
(read stdout))
|
||||
|
||||
(defn describe->ops [describe-reply]
|
||||
(some->> (get describe-reply "ops") keys (map keyword) set))
|
||||
|
||||
(defn describe->metadata [describe-reply]
|
||||
(let [format (-> (get describe-reply "format") bytes->string keyword)
|
||||
ops (describe->ops describe-reply)
|
||||
readers (when (identical? :edn format)
|
||||
(read-readers describe-reply resolve))]
|
||||
{:format format, :ops ops, :readers readers}))
|
||||
|
||||
(defn load-pod-metadata [pod-spec opts]
|
||||
(let [{:keys [:pod-spec :opts]} (resolve-pod pod-spec opts)
|
||||
running-pod (run-pod pod-spec opts)
|
||||
describe-reply (describe-pod running-pod)
|
||||
ops (describe->ops describe-reply)]
|
||||
(future (destroy* (assoc running-pod :ops ops)))
|
||||
describe-reply))
|
||||
|
||||
(defn load-pod
|
||||
([pod-spec] (load-pod pod-spec nil))
|
||||
([pod-spec opts]
|
||||
(let [{:keys [:version :force]} opts
|
||||
resolved (when (qualified-symbol? pod-spec)
|
||||
(resolver/resolve pod-spec version force))
|
||||
opts (if resolved
|
||||
(if-let [extra-opts (:options resolved)]
|
||||
(merge opts extra-opts)
|
||||
opts)
|
||||
opts)
|
||||
{:keys [:remove-ns :resolve :transport]} opts
|
||||
pod-spec (cond resolved [(:executable resolved)]
|
||||
(string? pod-spec) [pod-spec]
|
||||
:else pod-spec)
|
||||
pb (ProcessBuilder. ^java.util.List pod-spec)
|
||||
socket? (identical? :socket transport)
|
||||
_ (if socket?
|
||||
(.inheritIO pb)
|
||||
(.redirectError pb java.lang.ProcessBuilder$Redirect/INHERIT))
|
||||
_ (cond-> (doto (.environment pb)
|
||||
(.put "BABASHKA_POD" "true"))
|
||||
socket? (.put "BABASHKA_POD_TRANSPORT" "socket"))
|
||||
p (.start pb)
|
||||
port-file (when socket? (port-file (.pid p)))
|
||||
socket-port (when socket? (read-port port-file))
|
||||
[socket stdin stdout]
|
||||
(if socket?
|
||||
(let [^Socket socket
|
||||
(loop []
|
||||
(if-let [sock (try (create-socket "localhost" socket-port)
|
||||
(catch java.net.ConnectException _
|
||||
nil))]
|
||||
sock
|
||||
(recur)))]
|
||||
[socket
|
||||
(.getOutputStream socket)
|
||||
(PushbackInputStream. (.getInputStream socket))])
|
||||
[nil (.getOutputStream p) (java.io.PushbackInputStream. (.getInputStream p))])
|
||||
_ (write stdin {"op" "describe"
|
||||
"id" (next-id)})
|
||||
reply (read stdout)
|
||||
format (-> (get reply "format") bytes->string keyword)
|
||||
ops (some->> (get reply "ops") keys (map keyword) set)
|
||||
readers (when (identical? :edn format)
|
||||
(read-readers reply resolve))
|
||||
(let [{:keys [:pod-spec :opts]} (resolve-pod pod-spec opts)
|
||||
{:keys [:remove-ns :resolve]} opts
|
||||
|
||||
{p :process, stdin :stdin, stdout :stdout, socket :socket
|
||||
:as running-pod}
|
||||
(run-pod pod-spec opts)
|
||||
|
||||
reply (or (:metadata opts)
|
||||
(describe-pod running-pod))
|
||||
{:keys [:format :ops :readers]} (describe->metadata reply)
|
||||
pod {:process p
|
||||
:pod-spec pod-spec
|
||||
:stdin stdin
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
(def os {:os/name (System/getProperty "os.name")
|
||||
:os/arch (let [arch (System/getProperty "os.arch")]
|
||||
(normalize-arch arch))})
|
||||
|
||||
(defn warn [& strs]
|
||||
(binding [*out* *err*]
|
||||
(apply println strs)))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,11 @@
|
|||
(ns babashka.pods.sci
|
||||
(:require [babashka.pods.impl :as impl]
|
||||
[sci.core :as sci]))
|
||||
[sci.core :as sci]
|
||||
[clojure.java.io :as io]
|
||||
[babashka.pods.impl.resolver :as resolver]
|
||||
[clojure.edn :as edn]
|
||||
[clojure.walk :as walk])
|
||||
(:import (java.io DataInputStream PushbackInputStream)))
|
||||
|
||||
(defn- process-namespace [ctx {:keys [:name :vars]}]
|
||||
(let [env (:env ctx)
|
||||
|
|
@ -19,6 +24,40 @@
|
|||
(string? var-value)
|
||||
(sci/eval-string* ctx var-value))))))
|
||||
|
||||
(defn metadata-cache-file [pod-spec {:keys [:version]}]
|
||||
(io/file (resolver/cache-dir {:pod/name pod-spec :pod/version version})
|
||||
"metadata.cache"))
|
||||
|
||||
(defn load-metadata-from-cache [pod-spec opts]
|
||||
(let [cache-file (metadata-cache-file pod-spec opts)]
|
||||
(when (.exists cache-file)
|
||||
(with-open [r (PushbackInputStream. (io/input-stream cache-file))]
|
||||
(impl/read r)))))
|
||||
|
||||
(defn load-pod-metadata* [pod-spec {:keys [:version] :as opts}]
|
||||
(let [metadata (impl/load-pod-metadata pod-spec opts)
|
||||
cache-file (when (qualified-symbol? pod-spec) ; don't cache local pods b/c their namespaces can change
|
||||
(metadata-cache-file pod-spec opts))]
|
||||
(when cache-file
|
||||
(with-open [w (io/output-stream cache-file)]
|
||||
(impl/write w metadata)))
|
||||
metadata))
|
||||
|
||||
(defn load-pod-metadata [ctx pod-spec {:keys [:version] :as opts}]
|
||||
(let [metadata
|
||||
(if-let [cached-metadata (when (qualified-symbol? pod-spec) ; don't cache local pods b/c their namespaces can change
|
||||
(load-metadata-from-cache pod-spec opts))]
|
||||
cached-metadata
|
||||
(load-pod-metadata* pod-spec opts))]
|
||||
(dorun
|
||||
(for [ns (get metadata "namespaces")]
|
||||
(let [ns-sym (-> ns (get "name") impl/bytes->string symbol)
|
||||
key-path [:pod-namespaces ns-sym]
|
||||
env (:env ctx)]
|
||||
(swap! env assoc-in key-path
|
||||
{:pod-spec pod-spec
|
||||
:opts (assoc opts :metadata metadata)}))))))
|
||||
|
||||
(defn load-pod
|
||||
([ctx pod-spec] (load-pod ctx pod-spec nil))
|
||||
([ctx pod-spec version opts] (load-pod ctx pod-spec (assoc opts :version version)))
|
||||
|
|
|
|||
Loading…
Reference in a new issue