diff --git a/CHANGELOG.md b/CHANGELOG.md index d3425b5e..d812e659 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ A preview of the next release can be installed from - [#1785](https://github.com/babashka/babashka/issues/1785): Allow subclasses of `Throwable` to have instance methods invoked ([@bobisageek](https://github.com/bobisageek)) - [#1791](https://github.com/babashka/babashka/issues/1791): interop problem on Jsoup form element +- Bump core.async to `1.8.735` - [#1793](https://github.com/babashka/babashka/issues/1793): Bump `rewrite-clj` to `1.1.49` (fixes parsing of `foo//` among other things) - Bump `deps.clj` - Bump `fs` diff --git a/deps.edn b/deps.edn index 7f036267..28cf6745 100644 --- a/deps.edn +++ b/deps.edn @@ -25,7 +25,7 @@ babashka/babashka.curl {:local/root "babashka.curl"} babashka/fs {:local/root "fs"} babashka/babashka.core {:local/root "babashka.core"} - org.clojure/core.async {:mvn/version "1.7.701"}, + org.clojure/core.async {:mvn/version "1.8.735"}, org.clojure/tools.cli {:mvn/version "1.0.214"}, org.clojure/data.csv {:mvn/version "1.0.0"}, cheshire/cheshire {:mvn/version "5.13.0"} diff --git a/project.clj b/project.clj index b88832d5..d3377ebe 100644 --- a/project.clj +++ b/project.clj @@ -29,7 +29,7 @@ [borkdude/sci.impl.reflector "0.0.3"] [org.babashka/sci.impl.types "0.0.2"] [org.babashka/babashka.impl.java "0.1.10"] - [org.clojure/core.async "1.7.701"] + [org.clojure/core.async "1.8.735"] [org.clojure/test.check "1.1.1"] [com.github.clj-easy/graal-build-time "0.1.0"] [rewrite-clj/rewrite-clj "1.1.49"] diff --git a/resources/META-INF/babashka/deps.edn b/resources/META-INF/babashka/deps.edn index 7f036267..28cf6745 100644 --- a/resources/META-INF/babashka/deps.edn +++ b/resources/META-INF/babashka/deps.edn @@ -25,7 +25,7 @@ babashka/babashka.curl {:local/root "babashka.curl"} babashka/fs {:local/root "fs"} babashka/babashka.core {:local/root "babashka.core"} - org.clojure/core.async {:mvn/version "1.7.701"}, + org.clojure/core.async {:mvn/version "1.8.735"}, org.clojure/tools.cli {:mvn/version "1.0.214"}, org.clojure/data.csv {:mvn/version "1.0.0"}, cheshire/cheshire {:mvn/version "5.13.0"} diff --git a/src/aaaa_this_has_to_be_first/because_patches.clj b/src/aaaa_this_has_to_be_first/because_patches.clj index b8651a25..6fa5e12f 100644 --- a/src/aaaa_this_has_to_be_first/because_patches.clj +++ b/src/aaaa_this_has_to_be_first/because_patches.clj @@ -1,7 +1,8 @@ (ns aaaa-this-has-to-be-first.because-patches ;; we need pprint loaded first, it patches pprint to not bloat the GraalVM binary (:require [babashka.impl.patches.datafy] - [babashka.impl.pprint])) + [babashka.impl.pprint] + )) ;; Enable this for scanning requiring usage: (def enable-require-scan diff --git a/src/babashka/impl/clojure/core/async.clj b/src/babashka/impl/clojure/core/async.clj index 4d0d22a3..a12c5b76 100644 --- a/src/babashka/impl/clojure/core/async.clj +++ b/src/babashka/impl/clojure/core/async.clj @@ -2,13 +2,39 @@ {:no-doc true} (:require [clojure.core.async :as async] [clojure.core.async.impl.protocols :as protocols] + [clojure.core.async.impl.dispatch :as dispatch] [sci.core :as sci :refer [copy-var]] [sci.impl.copy-vars :refer [macrofy]] - [sci.impl.vars :as vars])) + [sci.impl.vars :as vars]) + (:import [java.util.concurrent Executors ExecutorService ThreadFactory])) (set! *warn-on-reflection* true) -(def ^java.util.concurrent.Executor executor @#'async/thread-macro-executor) +#_(def ^java.util.concurrent.Executor executor @#'async/thread-macro-executor) +(def executor-for + "Given a workload tag, returns an ExecutorService instance and memoizes the result. By + default, core.async will defer to a user factory (if provided via sys prop) or construct + a specialized ExecutorService instance for each tag :io, :compute, and :mixed. When + given the tag :core-async-dispatch it will default to the executor service for :io." + (memoize + (fn ^ExecutorService [workload] + (let [sysprop-factory nil #_(when-let [esf (System/getProperty "clojure.core.async.executor-factory")] + (requiring-resolve (symbol esf))) + sp-exec (and sysprop-factory (sysprop-factory workload))] + (or sp-exec + (if (= workload :core-async-dispatch) + (executor-for :io) + (@#'dispatch/create-default-executor workload))))))) + +(alter-var-root #'dispatch/executor-for (constantly executor-for)) + +#_#_(defn exec + [^Runnable r workload] + (prn :r r :w workload) + (let [^ExecutorService e (executor-for workload)] + (.execute e r))) + +(alter-var-root #'dispatch/exec (constantly exec)) (def ^java.util.concurrent.Executor virtual-executor (try (eval '(java.util.concurrent.Executors/newVirtualThreadPerTaskExecutor)) @@ -17,20 +43,25 @@ (defn thread-call "Executes f in another thread, returning immediately to the calling thread. Returns a channel which will receive the result of calling - f when completed, then close." - [f] - (let [c (async/chan 1)] - (let [binds (vars/get-thread-binding-frame)] - (.execute executor - (fn [] - (vars/reset-thread-binding-frame binds) - (try - (let [ret (f)] - (when-not (nil? ret) - (async/>!! c ret))) - (finally - (async/close! c)))))) - c)) + f when completed, then close. workload is a keyword that describes + the work performed by f, where: + + :io - may do blocking I/O but must not do extended computation + :compute - must not ever block + :mixed - anything else (default) + + when workload not supplied, defaults to :mixed" + ([f] (thread-call f :mixed)) + ([f workload] + (let [c (async/chan 1) + returning-to-chan (fn [bf] + #(try + (when-some [ret (bf)] + (async/>!! c ret)) + (finally (async/close! c)))) + f (vars/binding-conveyor-fn f)] + (-> f #_bound-fn* returning-to-chan (dispatch/exec workload)) + c))) (defn -vthread-call "Executes f in another virtual thread, returning immediately to the calling @@ -38,21 +69,23 @@ f when completed, then close." [f] (let [c (async/chan 1)] - (let [binds (vars/get-thread-binding-frame)] + (let [returning-to-chan (fn [bf] + #(try + (when-some [ret (bf)] + (async/>!! c ret)) + (finally (async/close! c)))) + f (vars/binding-conveyor-fn f)] (.execute virtual-executor - (fn [] - (vars/reset-thread-binding-frame binds) - (try - (let [ret (f)] - (when-not (nil? ret) - (async/>!! c ret))) - (finally - (async/close! c)))))) + (-> f returning-to-chan))) c)) (defn thread [_ _ & body] - `(~'clojure.core.async/thread-call (fn [] ~@body))) + `(~'clojure.core.async/thread-call (fn [] ~@body) :mixed)) + +(defn io-thread + [_ _ & body] + `(~'clojure.core.async/thread-call (fn [] ~@body) :io)) (defn -vthread [_ _ & body] @@ -128,6 +161,7 @@ 'take! (copy-var async/take! core-async-namespace) 'tap (copy-var async/tap core-async-namespace) 'thread (macrofy 'thread thread core-async-namespace) + 'io-thread (macrofy 'io-thread io-thread core-async-namespace) 'thread-call (copy-var thread-call core-async-namespace) '-vthread-call (copy-var -vthread-call core-async-namespace) 'timeout (copy-var timeout core-async-namespace) diff --git a/src/babashka/impl/pprint.clj b/src/babashka/impl/pprint.clj index 88ada2e6..928b75d2 100644 --- a/src/babashka/impl/pprint.clj +++ b/src/babashka/impl/pprint.clj @@ -2,7 +2,8 @@ {:no-doc true} (:require [clojure.pprint :as pprint] [sci.core :as sci] - [sci.pprint])) + [sci.pprint] + [babashka.impl.clojure.core.async])) (defonce patched? (volatile! false))