diff --git a/src/babashka/impl/async.clj b/src/babashka/impl/async.clj index 9cfa421a..6d2cf7a0 100644 --- a/src/babashka/impl/async.clj +++ b/src/babashka/impl/async.clj @@ -1,18 +1,50 @@ (ns babashka.impl.async {:no-doc true} (:require [clojure.core.async :as async] - [clojure.core.async.impl.protocols :as protocols])) + [clojure.core.async.impl.protocols :as protocols] + [sci.impl.vars :as vars])) + +(def ^java.util.concurrent.Executor executor @#'async/thread-macro-executor) + +(defn thread-call + "Executes f in another thread, returning immediately to the calling + thread. Returns a channel which will receive the result of calling + f when completed, then close." + [f] + (let [c (async/chan 1)] + (let [binds (vars/get-thread-binding-frame)] + (.execute executor + (fn [] + (vars/reset-thread-binding-frame binds) + (try + (let [ret (f)] + (when-not (nil? ret) + (async/>!! c ret))) + (finally + (async/close! c)))))) + c)) (defn thread [_ _ & body] `(~'clojure.core.async/thread-call (fn [] ~@body))) +(defn alt!! + "Like alt!, except as if by alts!!, will block until completed, and + not intended for use in (go ...) blocks." + [_ _ & clauses] + (async/do-alt 'clojure.core.async/alts!! clauses)) + +(defn go-loop + [_ _ bindings & body] + (list 'clojure.core.async/thread (list* 'loop bindings body))) + (def async-namespace {'!! async/>!! 'admix async/admix 'alts! async/alts! 'alts!! async/alts!! + 'alt!! (with-meta alt!! {:sci/macro true}) 'buffer async/buffer 'chan async/chan 'close! async/close! @@ -53,7 +85,7 @@ 'take! async/take! 'tap async/tap 'thread (with-meta thread {:sci/macro true}) - 'thread-call async/thread-call + 'thread-call thread-call 'timeout async/timeout 'to-chan async/to-chan 'toggle async/toggle @@ -65,7 +97,13 @@ 'unsub async/unsub 'unsub-all async/unsub-all 'untap async/untap - 'untap-all async/untap-all}) + 'untap-all async/untap-all + ;; polyfill + 'go (with-meta thread {:sci/macro true}) + '! async/>!! + 'alt! (with-meta alt!! {:sci/macro true}) + 'go-loop (with-meta go-loop {:sci/macro true})}) (def async-protocols-namespace {'ReadPort protocols/ReadPort}) diff --git a/test/babashka/async_test.clj b/test/babashka/async_test.clj new file mode 100644 index 00000000..783e7788 --- /dev/null +++ b/test/babashka/async_test.clj @@ -0,0 +1,35 @@ +(ns babashka.async-test + (:require + [babashka.test-utils :as test-utils] + [clojure.edn :as edn] + [clojure.test :as t :refer [deftest is]])) + +(deftest alts!!-test + (is (= "process 2\n" (test-utils/bb nil " + (defn async-command [& args] + (async/thread (apply shell/sh \"bash\" \"-c\" args))) + + (-> (async/alts!! [(async-command \"sleep 2 && echo process 1\") + (async-command \"sleep 1 && echo process 2\")]) + first :out str/trim println)")))) + +(deftest go-test + (is (number? (edn/read-string (test-utils/bb nil " +(defn calculation-go [] + (async/go + ;; wait for some stuff + (rand-int 1000))) + +(defn get-result-go [] + (async/go + (->> + (repeatedly 10 calculation-go) + (map async/ (async/alts!! [(async-command \"sleep 2 && echo process 1\") - (async-command \"sleep 1 && echo process 2\")]) - first :out str/trim println)")))) - (deftest tools-cli-test (is (= {:result 8080} (bb nil "test/babashka/scripts/tools.cli.bb"))))