Import fj private wrappers from clojure.core.reducers

This commit is contained in:
Sean Corfield 2020-06-27 12:21:02 -07:00
parent c4430abe49
commit 99dd44c2ba

View file

@ -27,7 +27,8 @@
ResultSet ResultSetMetaData
Statement
SQLException)
(java.util Locale)))
(java.util Locale)
(java.util.concurrent ForkJoinPool ForkJoinTask)))
(set! *warn-on-reflection* true)
@ -603,6 +604,24 @@
init')))
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
;; ForkJoinTask wrappers copied in from clojure.core.reducers to avoid
;; relying on private functionality that might possibly change over time
(defn- fjtask [^Callable f]
(ForkJoinTask/adapt f))
(defn- fjinvoke
"For now, this still relies on clojure.core.reducers/pool which is
public but undocumented."
[f]
(if (ForkJoinTask/inForkJoinPool)
(f)
(.invoke ^ForkJoinPool @r/pool ^ForkJoinTask (fjtask f))))
(defn- fjfork [task] (.fork ^ForkJoinTask task))
(defn- fjjoin [task] (.join ^ForkJoinTask task))
(defn- fold-stmt
"Execute the `PreparedStatement`, attempt to get either its `ResultSet` or
its generated keys (as a `ResultSet`), and fold that using the supplied
@ -614,27 +633,27 @@
[^PreparedStatement stmt n combinef reducef connectable opts]
(if-let [rs (stmt->result-set stmt opts)]
(let [rs-map (mapify-result-set rs opts)
chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch)))
realize (fn [row] (datafiable-row row connectable opts))]
(loop [batch [] task nil]
(if (.next rs)
(if (= n (count batch))
(recur [(realize rs-map)]
(let [t (#'r/fjfork (chunk batch))]
(let [t (fjfork (chunk batch))]
(if task
(#'r/fjfork
(#'r/fjtask #(combinef (#'r/fjjoin task)
(#'r/fjjoin t))))
(fjfork
(fjtask #(combinef (fjjoin task)
(fjjoin t))))
t)))
(recur (conj batch (realize rs-map)) task))
(if (seq batch)
(let [t (#'r/fjfork (chunk batch))]
(#'r/fjinvoke
#(combinef (if task (#'r/fjjoin task) (combinef))
(#'r/fjjoin t))))
(let [t (fjfork (chunk batch))]
(fjinvoke
#(combinef (if task (fjjoin task) (combinef))
(fjjoin t))))
(if task
(#'r/fjinvoke
#(combinef (combinef) (#'r/fjjoin task)))
(fjinvoke
#(combinef (combinef) (fjjoin task)))
(combinef))))))
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
@ -680,27 +699,27 @@
[^Statement stmt sql n combinef reducef connectable opts]
(if-let [rs (stmt-sql->result-set stmt sql)]
(let [rs-map (mapify-result-set rs opts)
chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch)))
realize (fn [row] (datafiable-row row connectable opts))]
(loop [batch [] task nil]
(if (.next rs)
(if (= n (count batch))
(recur [(realize rs-map)]
(let [t (#'r/fjfork (chunk batch))]
(let [t (fjfork (chunk batch))]
(if task
(#'r/fjfork
(#'r/fjtask #(combinef (#'r/fjjoin task)
(#'r/fjjoin t))))
(fjfork
(fjtask #(combinef (fjjoin task)
(fjjoin t))))
t)))
(recur (conj batch (realize rs-map)) task))
(if (seq batch)
(let [t (#'r/fjfork (chunk batch))]
(#'r/fjinvoke
#(combinef (if task (#'r/fjjoin task) (combinef))
(#'r/fjjoin t))))
(let [t (fjfork (chunk batch))]
(fjinvoke
#(combinef (if task (fjjoin task) (combinef))
(fjjoin t))))
(if task
(#'r/fjinvoke
#(combinef (combinef) (#'r/fjjoin task)))
(fjinvoke
#(combinef (combinef) (fjjoin task)))
(combinef))))))
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))