[#255] support clojure.core.async/go macro by falling back on threads
This commit is contained in:
parent
00420879be
commit
24a469af65
3 changed files with 76 additions and 12 deletions
|
|
@ -1,18 +1,50 @@
|
||||||
(ns babashka.impl.async
|
(ns babashka.impl.async
|
||||||
{:no-doc true}
|
{:no-doc true}
|
||||||
(:require [clojure.core.async :as async]
|
(:require [clojure.core.async :as async]
|
||||||
[clojure.core.async.impl.protocols :as protocols]))
|
[clojure.core.async.impl.protocols :as protocols]
|
||||||
|
[sci.impl.vars :as vars]))
|
||||||
|
|
||||||
|
(def ^java.util.concurrent.Executor executor @#'async/thread-macro-executor)
|
||||||
|
|
||||||
|
(defn thread-call
|
||||||
|
"Executes f in another thread, returning immediately to the calling
|
||||||
|
thread. Returns a channel which will receive the result of calling
|
||||||
|
f when completed, then close."
|
||||||
|
[f]
|
||||||
|
(let [c (async/chan 1)]
|
||||||
|
(let [binds (vars/get-thread-binding-frame)]
|
||||||
|
(.execute executor
|
||||||
|
(fn []
|
||||||
|
(vars/reset-thread-binding-frame binds)
|
||||||
|
(try
|
||||||
|
(let [ret (f)]
|
||||||
|
(when-not (nil? ret)
|
||||||
|
(async/>!! c ret)))
|
||||||
|
(finally
|
||||||
|
(async/close! c))))))
|
||||||
|
c))
|
||||||
|
|
||||||
(defn thread
|
(defn thread
|
||||||
[_ _ & body]
|
[_ _ & body]
|
||||||
`(~'clojure.core.async/thread-call (fn [] ~@body)))
|
`(~'clojure.core.async/thread-call (fn [] ~@body)))
|
||||||
|
|
||||||
|
(defn alt!!
|
||||||
|
"Like alt!, except as if by alts!!, will block until completed, and
|
||||||
|
not intended for use in (go ...) blocks."
|
||||||
|
[_ _ & clauses]
|
||||||
|
(async/do-alt 'clojure.core.async/alts!! clauses))
|
||||||
|
|
||||||
|
(defn go-loop
|
||||||
|
[_ _ bindings & body]
|
||||||
|
(list 'clojure.core.async/thread (list* 'loop bindings body)))
|
||||||
|
|
||||||
(def async-namespace
|
(def async-namespace
|
||||||
{'<!! async/<!!
|
{'<!! async/<!!
|
||||||
'>!! async/>!!
|
'>!! async/>!!
|
||||||
'admix async/admix
|
'admix async/admix
|
||||||
'alts! async/alts!
|
'alts! async/alts!
|
||||||
'alts!! async/alts!!
|
'alts!! async/alts!!
|
||||||
|
'alt!! (with-meta alt!! {:sci/macro true})
|
||||||
'buffer async/buffer
|
'buffer async/buffer
|
||||||
'chan async/chan
|
'chan async/chan
|
||||||
'close! async/close!
|
'close! async/close!
|
||||||
|
|
@ -53,7 +85,7 @@
|
||||||
'take! async/take!
|
'take! async/take!
|
||||||
'tap async/tap
|
'tap async/tap
|
||||||
'thread (with-meta thread {:sci/macro true})
|
'thread (with-meta thread {:sci/macro true})
|
||||||
'thread-call async/thread-call
|
'thread-call thread-call
|
||||||
'timeout async/timeout
|
'timeout async/timeout
|
||||||
'to-chan async/to-chan
|
'to-chan async/to-chan
|
||||||
'toggle async/toggle
|
'toggle async/toggle
|
||||||
|
|
@ -65,7 +97,13 @@
|
||||||
'unsub async/unsub
|
'unsub async/unsub
|
||||||
'unsub-all async/unsub-all
|
'unsub-all async/unsub-all
|
||||||
'untap async/untap
|
'untap async/untap
|
||||||
'untap-all async/untap-all})
|
'untap-all async/untap-all
|
||||||
|
;; polyfill
|
||||||
|
'go (with-meta thread {:sci/macro true})
|
||||||
|
'<! async/<!!
|
||||||
|
'>! async/>!!
|
||||||
|
'alt! (with-meta alt!! {:sci/macro true})
|
||||||
|
'go-loop (with-meta go-loop {:sci/macro true})})
|
||||||
|
|
||||||
(def async-protocols-namespace
|
(def async-protocols-namespace
|
||||||
{'ReadPort protocols/ReadPort})
|
{'ReadPort protocols/ReadPort})
|
||||||
|
|
|
||||||
35
test/babashka/async_test.clj
Normal file
35
test/babashka/async_test.clj
Normal file
|
|
@ -0,0 +1,35 @@
|
||||||
|
(ns babashka.async-test
|
||||||
|
(:require
|
||||||
|
[babashka.test-utils :as test-utils]
|
||||||
|
[clojure.edn :as edn]
|
||||||
|
[clojure.test :as t :refer [deftest is]]))
|
||||||
|
|
||||||
|
(deftest alts!!-test
|
||||||
|
(is (= "process 2\n" (test-utils/bb nil "
|
||||||
|
(defn async-command [& args]
|
||||||
|
(async/thread (apply shell/sh \"bash\" \"-c\" args)))
|
||||||
|
|
||||||
|
(-> (async/alts!! [(async-command \"sleep 2 && echo process 1\")
|
||||||
|
(async-command \"sleep 1 && echo process 2\")])
|
||||||
|
first :out str/trim println)"))))
|
||||||
|
|
||||||
|
(deftest go-test
|
||||||
|
(is (number? (edn/read-string (test-utils/bb nil "
|
||||||
|
(defn calculation-go []
|
||||||
|
(async/go
|
||||||
|
;; wait for some stuff
|
||||||
|
(rand-int 1000)))
|
||||||
|
|
||||||
|
(defn get-result-go []
|
||||||
|
(async/go
|
||||||
|
(->>
|
||||||
|
(repeatedly 10 calculation-go)
|
||||||
|
(map async/<!)
|
||||||
|
(reduce +))))
|
||||||
|
|
||||||
|
(async/<!! (get-result-go))")))))
|
||||||
|
|
||||||
|
(deftest binding-conveyance-test
|
||||||
|
(is (number? (edn/read-string (test-utils/bb nil "
|
||||||
|
(def ^:dynamic x 0)
|
||||||
|
(binding [x 10] (async/<!! (async/thread x)))")))))
|
||||||
|
|
@ -220,15 +220,6 @@
|
||||||
{:default :timed-out :timeout 100}))"
|
{:default :timed-out :timeout 100}))"
|
||||||
temp-dir-path))))))
|
temp-dir-path))))))
|
||||||
|
|
||||||
(deftest async-test
|
|
||||||
(is (= "process 2\n" (test-utils/bb nil "
|
|
||||||
(defn async-command [& args]
|
|
||||||
(async/thread (apply shell/sh \"bash\" \"-c\" args)))
|
|
||||||
|
|
||||||
(-> (async/alts!! [(async-command \"sleep 2 && echo process 1\")
|
|
||||||
(async-command \"sleep 1 && echo process 2\")])
|
|
||||||
first :out str/trim println)"))))
|
|
||||||
|
|
||||||
(deftest tools-cli-test
|
(deftest tools-cli-test
|
||||||
(is (= {:result 8080} (bb nil "test/babashka/scripts/tools.cli.bb"))))
|
(is (= {:result 8080} (bb nil "test/babashka/scripts/tools.cli.bb"))))
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue