This commit is contained in:
Michiel Borkent 2020-11-26 12:06:49 +01:00 committed by GitHub
parent 6ed33d259e
commit 67c33b2270
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 356 additions and 87 deletions

View file

@ -1,7 +1,9 @@
(ns babashka.impl.clojure.core
{:no-doc true}
(:refer-clojure :exclude [future read read-string])
(:require [borkdude.graal.locking :as locking]
(:refer-clojure :exclude [future read+string])
(:require [babashka.impl.common :as common]
[borkdude.graal.locking :as locking]
[clojure.string :as str]
[sci.core :as sci]
[sci.impl.namespaces :refer [copy-core-var]]))
@ -19,6 +21,24 @@
(def data-readers (sci/new-dynamic-var '*data-readers* nil))
(defn read+string
"Added for compatibility. Must be used with
clojure.lang.LineNumberingPushbackReader. Does not support all of
the options from the original yet."
([sci-ctx]
(read+string sci-ctx @sci/in))
([sci-ctx stream]
(read+string sci-ctx stream true nil))
([sci-ctx stream eof-error? eof-value]
(read+string sci-ctx stream eof-error? eof-value false))
([sci-ctx ^clojure.lang.LineNumberingPushbackReader stream _eof-error? eof-value _recursive?]
(let [_ (.captureString stream)
v (sci/parse-next sci-ctx stream {:eof eof-value})
s (str/trim (.getString stream))]
[(if (identical? :sci.core/eof v)
eof-value
v) s])))
(def core-extras
{'file-seq (copy-core-var file-seq)
'agent (copy-core-var agent)
@ -37,4 +57,6 @@
'remove-tap (copy-core-var remove-tap)
'*data-readers* data-readers
'default-data-readers default-data-readers
'xml-seq (copy-core-var xml-seq)})
'xml-seq (copy-core-var xml-seq)
'read+string (fn [& args]
(apply read+string @common/ctx args))})

View file

@ -13,17 +13,34 @@
:author "Alex Miller"
:no-doc true}
babashka.impl.clojure.core.server
(:refer-clojure :exclude [locking])
(:require [sci.core :as sci]
(:require [babashka.impl.clojure.core :as core]
[babashka.impl.clojure.main :as m]
[babashka.impl.common :refer [verbose?]]
[sci.core :as sci]
[sci.impl.parser :as p]
[sci.impl.vars :as vars])
(:import
[clojure.lang LineNumberingPushbackReader]
[java.io BufferedWriter InputStreamReader OutputStreamWriter]
[java.net InetAddress Socket ServerSocket SocketException]
[java.io BufferedWriter InputStreamReader OutputStreamWriter]))
[java.util.concurrent.locks ReentrantLock]))
(set! *warn-on-reflection* true)
(def server (atom nil))
(def ^:dynamic *session* nil)
;; lock protects servers
(defonce ^:private lock (ReentrantLock.))
(defonce ^:private servers {})
(defmacro ^:private with-lock
[lock-expr & body]
`(let [lockee# ~(with-meta lock-expr {:tag 'java.util.concurrent.locks.ReentrantLock})]
(.lock lockee#)
(try
~@body
(finally
(.unlock lockee#)))))
(defmacro ^:private thread
[^String name daemon & body]
@ -31,6 +48,15 @@
(.setDaemon ~daemon)
(.start)))
(defn- resolve-fn [ctx valf]
(if (symbol? valf)
(let [fully-qualified (p/fully-qualify ctx valf)]
(or (some-> ctx :env deref :namespaces
(get (symbol (namespace fully-qualified)))
(get (symbol (name fully-qualified))))
(throw (Exception. (str "can't resolve: " valf)))))
valf))
(defn- accept-connection
"Start accept function, to be invoked on a client thread, given:
conn - client socket
@ -41,18 +67,22 @@
err - err stream
accept - accept fn symbol to invoke
args - to pass to accept-fn"
[^Socket conn client-id in out err accept args]
[ctx ^Socket conn client-id in out err accept args]
(let [accept (resolve-fn ctx accept)]
(try
(binding [*session* {:server name :client client-id}]
(sci/with-bindings {sci/in in
sci/out out
sci/err err
vars/current-ns (vars/->SciNamespace 'user nil)}
(swap! server assoc-in [:sessions client-id] {})
(apply accept args))
sci/ns (sci/create-ns 'user nil)}
(with-lock lock
(alter-var-root #'servers assoc-in [name :sessions client-id] {}))
(apply accept args)))
(catch SocketException _disconnect)
(finally
(swap! server update-in [:sessions] dissoc client-id)
(.close conn))))
(with-lock lock
(alter-var-root #'servers update-in [name :sessions] dissoc client-id))
(.close conn)))))
(defn start-server
"Start a socket server given the specified opts:
@ -65,14 +95,15 @@
:server-daemon Is server thread a daemon?, defaults to true
:client-daemon Are client threads daemons?, defaults to true
Returns server socket."
[opts]
^ServerSocket [ctx opts]
(let [{:keys [address port name accept args bind-err server-daemon client-daemon]
:or {bind-err true
server-daemon true
client-daemon true}} opts
address (InetAddress/getByName address) ;; nil returns loopback
socket (ServerSocket. port 0 address)]
(reset! server {:name name, :socket socket, :sessions {}})
(with-lock lock
(alter-var-root #'servers assoc name {:name name, :socket socket, :sessions {}}))
(thread
(str "Clojure Server " name) server-daemon
(try
@ -85,19 +116,144 @@
client-id (str client-counter)]
(thread
(str "Clojure Connection " name " " client-id) client-daemon
(accept-connection conn client-id in out (if bind-err out *err*) accept args)))
(accept-connection ctx conn client-id in out (if bind-err out *err*) accept args)))
(catch SocketException _disconnect))
(recur (inc client-counter))))
(finally
(reset! server nil))))
(with-lock lock
(alter-var-root #'servers dissoc name)))))
socket))
(defn stop-server
"Stop server with name or use the server-name from *session* if none supplied.
Returns true if server stopped successfully, nil if not found, or throws if
there is an error closing the socket."
([]
(stop-server (:server *session*)))
([name]
(with-lock lock
(let [server-socket ^ServerSocket (get-in servers [name :socket])]
(when server-socket
(alter-var-root #'servers dissoc name)
(.close server-socket)
true)))))
(defn stop-servers
"Stop all servers ignores all errors, and returns nil."
[]
(when-let [s @server]
(when-let [server-socket ^ServerSocket (get s :socket)]
(.close server-socket)))
(reset! server nil))
(with-lock lock
(doseq [name (keys servers)]
(future (stop-server name)))))
(defn- ex->data
[ex phase]
(let [ex (assoc (Throwable->map ex) :phase phase)
ex (if (not @verbose?)
(update ex :trace #(vec (take 100 %)))
ex)]
ex))
(defn prepl
"a REPL with structured output (for programs)
reads forms to eval from in-reader (a LineNumberingPushbackReader)
Closing the input or passing the form :repl/quit will cause it to return
Calls out-fn with data, one of:
{:tag :ret
:val val ;;eval result
:ns ns-name-string
:ms long ;;eval time in milliseconds
:form string ;;iff successfully read
:clojure.error/phase (:execution et al per clojure.main/ex-triage) ;;iff error occurred
}
{:tag :out
:val string} ;chars from during-eval *out*
{:tag :err
:val string} ;chars from during-eval *err*
{:tag :tap
:val val} ;values from tap>
You might get more than one :out or :err per eval, but exactly one :ret
tap output can happen at any time (i.e. between evals)
If during eval an attempt is made to read *in* it will read from in-reader unless :stdin is supplied
Alpha, subject to change."
{:added "1.10"}
[ctx in-reader out-fn & {:keys [stdin]}]
(let [EOF (Object.)
tapfn #(out-fn {:tag :tap :val %1})]
(m/with-bindings
(sci/with-bindings {sci/in (or stdin in-reader)
sci/out (PrintWriter-on #(out-fn {:tag :out :val %1}) nil)
sci/err (PrintWriter-on #(out-fn {:tag :err :val %1}) nil)
sci/ns (sci/create-ns 'user nil)
sci/print-length @sci/print-length
sci/print-meta @sci/print-meta
sci/*1 nil
sci/*2 nil
sci/*3 nil
sci/*e nil}
(try
;; babashka uses Clojure's global tap system so this should be ok
(add-tap tapfn)
(loop []
(when (try
(let [[form s] (core/read+string ctx in-reader false EOF)]
(try
(when-not (identical? form EOF)
(let [start (System/nanoTime)
ret (sci/with-bindings {sci/*1 *1
sci/*2 *2
sci/*3 *3
sci/*e *e}
(sci/eval-form ctx form))
ms (quot (- (System/nanoTime) start) 1000000)]
(when-not (= :repl/quit ret)
(set! *3 *2)
(set! *2 *1)
(set! *1 ret)
(out-fn {:tag :ret
:val (if (instance? Throwable ret)
(Throwable->map ret)
ret)
:ns (str (vars/current-ns-name))
:ms ms
:form s})
true)))
(catch Throwable ex
(prn (ex-message ex))
(set! *e ex)
(out-fn {:tag :ret :val (ex->data ex (or (-> ex ex-data :clojure.error/phase) :execution))
:ns (str (.name *ns*)) :form s
:exception true})
true)))
(catch Throwable ex
(set! *e ex)
(out-fn {:tag :ret :val (ex->data ex :read-source)
:ns (str (.name *ns*))
:exception true})
true))
(recur)))
(finally
(remove-tap tapfn)))))))
(defn io-prepl
"prepl bound to *in* and *out*, suitable for use with e.g. server/repl (socket-repl).
:ret and :tap vals will be processed by valf, a fn of one argument
or a symbol naming same (default pr-str)
Alpha, subject to change."
{:added "1.10"}
[ctx & {:keys [valf] :or {valf pr-str}}]
(let [valf (resolve-fn ctx valf)
out @sci/out
lock (Object.)]
(prepl ctx @sci/in
(fn [m]
(binding [*out* out *flush-on-newline* true, *print-readably* true]
;; we're binding *out* to the out, which was the original
;; sci/out, because we're using Clojure's regular prn below
(locking lock
(prn (if (#{:ret :tap} (:tag m))
(try
(assoc m :val (valf (:val m)))
(catch Throwable ex
(assoc m :val (ex->data ex :print-eval-result)
:exception true)))
m))))))))

View file

@ -2,3 +2,4 @@
;; placeholder for ctx
(def ctx (volatile! nil))
(def verbose? (volatile! false))

View file

@ -45,12 +45,14 @@
(let [in (r/indexing-push-back-reader (r/push-back-reader @sci/in))]
(m/repl
:init (or init
#(do (sio/println "Babashka"
(fn []
(sci/with-bindings {sci/out @sci/err}
(sio/println "Babashka"
(str "v" (str/trim (slurp (io/resource "BABASHKA_VERSION"))))
"REPL.")
(sio/println "Use :repl/quit or :repl/exit to quit the REPL.")
(sio/println "Clojure rocks, Bash reaches.")
(sio/println)
(sio/println))
(eval-form sci-ctx '(use 'clojure.repl))))
:read (or read
(fn [_request-prompt request-exit]

View file

@ -0,0 +1,2 @@
(ns babashka.impl.server)

View file

@ -2,29 +2,45 @@
{:no-doc true}
(:require
[babashka.impl.clojure.core.server :as server]
[babashka.impl.common :as common]
[babashka.impl.repl :as repl]
[clojure.edn :as edn]
[clojure.string :as str]))
(set! *warn-on-reflection* true)
(defn start-repl! [host+port sci-ctx]
(let [parts (str/split host+port #":")
;; this is mapped to clojure.core.server/repl in babashka.main
(def repl (fn []
(repl/repl @common/ctx)))
(defn parse-opts [opts]
(let [opts (str/trim opts)
opts (if (str/starts-with? opts "{")
(edn/read-string opts)
(let [parts (str/split opts #":")
[host port] (if (= 1 (count parts))
[nil (Integer. ^String (first parts))]
[(first parts) (Integer. ^String (second parts))])
host+port (if-not host (str "localhost:" port)
host+port)
socket (server/start-server
[(first parts) (Integer. ^String (second parts))])]
{:address host
:port port
:name "bb"
:accept babashka.impl.repl/repl
:args [sci-ctx]})]
(println "Babashka socket REPL started at" host+port)
:accept 'clojure.core.server/repl
:args []}))]
opts))
(defn start-repl! [opts sci-ctx]
(let [opts (parse-opts opts)
socket (server/start-server sci-ctx opts)
inet-address (java.net.InetAddress/getByName (:address opts))]
(binding [*out* *err*]
(println (format "Babashka socket REPL started at %s:%d"
(.getHostAddress inet-address)
(.getLocalPort socket))))
socket))
(defn stop-repl! []
(server/stop-server))
;; This is only used by tests where we run one server at a time.
(server/stop-servers))
(comment
@#'server/servers

View file

@ -7,6 +7,7 @@
[babashka.impl.classes :as classes]
[babashka.impl.classpath :as cp]
[babashka.impl.clojure.core :as core :refer [core-extras]]
[babashka.impl.clojure.core.server :as server]
[babashka.impl.clojure.java.browse :refer [browse-namespace]]
[babashka.impl.clojure.java.io :refer [io-namespace]]
[babashka.impl.clojure.java.shell :refer [shell-namespace]]
@ -316,9 +317,7 @@ If neither -e, -f, or --socket-repl are specified, then the first argument that
(sci/eval-string* @common/ctx s))))
(defn start-socket-repl! [address ctx]
(socket-repl/start-repl! address ctx)
;; hang until SIGINT
@(promise))
(socket-repl/start-repl! address ctx))
(defn start-nrepl! [address ctx]
(let [dev? (= "true" (System/getenv "BABASHKA_DEV"))
@ -366,6 +365,18 @@ If neither -e, -f, or --socket-repl are specified, then the first argument that
:cp new-cp})))
nil)
;;(def ^:private server-ns-obj (sci/create-ns 'clojure.core.server nil))
(def clojure-core-server
{'repl socket-repl/repl
'prepl (fn [& args]
(apply server/prepl @common/ctx args))
'io-prepl (fn [& args]
(apply server/io-prepl @common/ctx args))
'start-server (fn [& args]
(apply server/start-server @common/ctx args))})
(def namespaces
(cond->
{'clojure.tools.cli tools-cli-namespace
@ -379,7 +390,10 @@ If neither -e, -f, or --socket-repl are specified, then the first argument that
'clojure.stacktrace stacktrace-namespace
'clojure.zip zip-namespace
'clojure.main {'demunge demunge
'repl-requires clojure-main/repl-requires}
'repl-requires clojure-main/repl-requires
'repl (fn [& opts]
(let [opts (apply hash-map opts)]
(repl/start-repl! @common/ctx opts)))}
'clojure.test t/clojure-test-namespace
'babashka.classpath {'add-classpath add-classpath*}
'clojure.pprint pprint-namespace
@ -389,6 +403,7 @@ If neither -e, -f, or --socket-repl are specified, then the first argument that
'clojure.java.browse browse-namespace
'clojure.datafy datafy-namespace
'clojure.core.protocols protocols-namespace
'clojure.core.server clojure-core-server
'babashka.process process-namespace}
features/xml? (assoc 'clojure.data.xml @(resolve 'babashka.impl.xml/xml-namespace))
features/yaml? (assoc 'clj-yaml.core @(resolve 'babashka.impl.yaml/yaml-namespace)
@ -476,6 +491,7 @@ If neither -e, -f, or --socket-repl are specified, then the first argument that
:main :uberscript :describe?
:jar :uberjar] :as _opts}
(parse-opts args)
_ (when verbose? (vreset! common/verbose? true))
_ (do ;; set properties
(when main (System/setProperty "babashka.main" main))
(System/setProperty "babashka.version" version))
@ -540,11 +556,7 @@ If neither -e, -f, or --socket-repl are specified, then the first argument that
(cp/getResource loader [path] {:url? true})))))
(assoc-in ['user (with-meta '*input*
(when-not stream?
{:sci.impl/deref! true}))] input-var)
(assoc-in ['clojure.main 'repl]
(fn [& opts]
(let [opts (apply hash-map opts)]
(repl/start-repl! @common/ctx opts)))))
{:sci.impl/deref! true}))] input-var))
:env env
:features #{:bb :clj}
:classes classes/class-map
@ -581,6 +593,10 @@ If neither -e, -f, or --socket-repl are specified, then the first argument that
:preloads preloads
:loader (:loader @cp-state)})))))
nil))
;; socket REPL is start asynchronously. when no other args are
;; provided, a normal REPL will be started as well, which causes the
;; process to wait until SIGINT
_ (when socket-repl (start-socket-repl! socket-repl sci-ctx))
exit-code
(or exit-code
(second
@ -591,7 +607,6 @@ If neither -e, -f, or --socket-repl are specified, then the first argument that
describe?
[(print-describe) 0]
repl [(repl/start-repl! sci-ctx) 0]
socket-repl [(start-socket-repl! socket-repl sci-ctx) 0]
nrepl [(start-nrepl! nrepl sci-ctx) 0]
uberjar [nil 0]
expressions

View file

@ -1,6 +1,6 @@
(ns babashka.wait
(:require [clojure.java.io :as io])
(:import [java.net Socket ConnectException]))
(:import [java.net Socket SocketException]))
(set! *warn-on-reflection* true)
@ -19,7 +19,7 @@
(loop []
(let [v (try (.close (Socket. host port))
(- (System/currentTimeMillis) t0)
(catch ConnectException _e
(catch SocketException _e
(let [took (- (System/currentTimeMillis) t0)]
(if (and timeout (>= took timeout))
:wait-for-port.impl/timed-out

View file

@ -1,10 +1,15 @@
(ns babashka.impl.socket-repl-test
(:require
[babashka.impl.common :as common]
[babashka.impl.socket-repl :refer [start-repl! stop-repl!]]
[babashka.main :refer [clojure-core-server]]
[babashka.process :as p]
[babashka.wait :as w]
[babashka.test-utils :as tu]
[clojure.java.io :as io]
[clojure.java.shell :refer [sh]]
[clojure.string :as str]
[clojure.edn :as edn]
[clojure.test :as t :refer [deftest is testing]]
[sci.impl.opts :refer [init]]))
@ -16,37 +21,38 @@
sw (java.io.StringWriter.)
writer (io/writer socket)]
(binding [*out* writer]
(println (str expr))
(println ":repl/exit\n"))
(println (str expr "\n")))
(loop []
(when-let [l (.readLine ^java.io.BufferedReader reader)]
(when-let [l (try (.readLine ^java.io.BufferedReader reader)
(catch java.net.SocketException _ nil))]
;; (prn :l l)
(binding [*out* sw]
(println l))
(recur)))
(let [s (str sw)]
(is (str/includes? s expected)
(format "\"%s\" does not contain \"%s\""
s expected))
s)))
;; (prn :s s :expected expected (str/includes? s expected))
(if (if (fn? expected)
(expected s)
(str/includes? s expected))
(is true)
(recur)))))
(binding [*out* writer]
(println ":repl/quit\n"))
:success))
(def server-process (volatile! nil))
(deftest socket-repl-test
(try
(if tu/jvm?
(start-repl! "0.0.0.0:1666" (init {:bindings {'*command-line-args*
["a" "b" "c"]}
:env (atom {})
:features #{:bb}}))
(future
(sh "bash" "-c"
"echo '[1 2 3]' | ./bb --socket-repl 0.0.0.0:1666 a b c")))
;; wait for server to be available
(when tu/native?
(while (not (zero? (:exit
(sh "bash" "-c"
"lsof -t -i:1666"))))))
(let [ctx (init {:namespaces {'clojure.core.server clojure-core-server}
:features #{:bb}})]
(vreset! common/ctx ctx)
(start-repl! "0.0.0.0:1666" ctx))
(do (vreset! server-process
(p/process ["./bb" "--socket-repl" "localhost:1666"]))
(w/wait-for-port "localhost" 1666)))
(Thread/sleep 50)
(is (socket-command "(+ 1 2 3)" "user=> 6"))
(testing "*command-line-args*"
(is (socket-command '*command-line-args* "\"a\" \"b\" \"c\"")))
(testing "&env"
(socket-command "(defmacro bindings [] (mapv #(list 'quote %) (keys &env)))" "bindings")
(socket-command "(defn bar [x y z] (bindings))" "bar")
@ -60,8 +66,53 @@
(finally
(if tu/jvm?
(stop-repl!)
(sh "bash" "-c"
"kill -9 $(lsof -t -i:1666)")))))
(p/destroy-tree @server-process)))))
(deftest socket-repl-opts-test
(try
(if tu/jvm?
(let [ctx (init {:bindings {'*command-line-args*
["a" "b" "c"]}
:env (atom {})
:namespaces {'clojure.core.server clojure-core-server}
:features #{:bb}})]
(vreset! common/ctx ctx)
(start-repl! "{:address \"localhost\" :accept clojure.core.server/repl :port 1666}"
ctx))
(do (vreset! server-process
(p/process ["./bb" "--socket-repl" "{:address \"localhost\" :accept clojure.core.server/repl :port 1666}"]))
(w/wait-for-port "localhost" 1666)))
(Thread/sleep 50)
(is (socket-command "(+ 1 2 3)" "user=> 6"))
(finally
(if tu/jvm?
(stop-repl!)
(p/destroy-tree @server-process)))))
(deftest socket-prepl-test
(try
(if tu/jvm?
(let [ctx (init {:bindings {'*command-line-args*
["a" "b" "c"]}
:env (atom {})
:namespaces {'clojure.core.server clojure-core-server}
:features #{:bb}})]
(vreset! common/ctx ctx)
(start-repl! "{:address \"localhost\" :accept clojure.core.server/io-prepl :port 1666}"
ctx))
(do (vreset! server-process
(p/process ["./bb" "--socket-repl" "{:address \"localhost\" :accept clojure.core.server/io-prepl :port 1666}"]))
(w/wait-for-port "localhost" 1666)))
(Thread/sleep 50)
(is (socket-command "(+ 1 2 3)" (fn [s]
(let [m (edn/read-string s)]
(and (= "6" (:val m))
(= "user" (:ns m))
(= "(+ 1 2 3)" (:form m)))))))
(finally
(if tu/jvm?
(stop-repl!)
(p/destroy-tree @server-process)))))
;;;; Scratch

View file

@ -558,6 +558,10 @@
(deftest java-stream-test
(is (every? number? (bb nil "(take 2 (iterator-seq (.iterator (.doubles (java.util.Random.)))))"))))
(deftest read+string-test
(is (= '[:user/foo "::foo"]
(bb nil "(read+string (clojure.lang.LineNumberingPushbackReader. (java.io.StringReader. \"::foo\")))"))))
(deftest iterable-test
(is (true? (bb nil "
(defn iter [coll]