diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index debad39..98dd9da 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -27,7 +27,8 @@ ResultSet ResultSetMetaData Statement SQLException) - (java.util Locale))) + (java.util Locale) + (java.util.concurrent ForkJoinPool ForkJoinTask))) (set! *warn-on-reflection* true) @@ -603,6 +604,24 @@ init'))) (f init {:next.jdbc/update-count (.getUpdateCount stmt)}))) +;; ForkJoinTask wrappers copied in from clojure.core.reducers to avoid +;; relying on private functionality that might possibly change over time + +(defn- fjtask [^Callable f] + (ForkJoinTask/adapt f)) + +(defn- fjinvoke + "For now, this still relies on clojure.core.reducers/pool which is + public but undocumented." + [f] + (if (ForkJoinTask/inForkJoinPool) + (f) + (.invoke ^ForkJoinPool @r/pool ^ForkJoinTask (fjtask f)))) + +(defn- fjfork [task] (.fork ^ForkJoinTask task)) + +(defn- fjjoin [task] (.join ^ForkJoinTask task)) + (defn- fold-stmt "Execute the `PreparedStatement`, attempt to get either its `ResultSet` or its generated keys (as a `ResultSet`), and fold that using the supplied @@ -614,27 +633,27 @@ [^PreparedStatement stmt n combinef reducef connectable opts] (if-let [rs (stmt->result-set stmt opts)] (let [rs-map (mapify-result-set rs opts) - chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) + chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) (recur [(realize rs-map)] - (let [t (#'r/fjfork (chunk batch))] + (let [t (fjfork (chunk batch))] (if task - (#'r/fjfork - (#'r/fjtask #(combinef (#'r/fjjoin task) - (#'r/fjjoin t)))) + (fjfork + (fjtask #(combinef (fjjoin task) + (fjjoin t)))) t))) (recur (conj batch (realize rs-map)) task)) (if (seq batch) - (let [t (#'r/fjfork (chunk batch))] - (#'r/fjinvoke - #(combinef (if task (#'r/fjjoin task) (combinef)) - (#'r/fjjoin t)))) + (let [t (fjfork (chunk batch))] + (fjinvoke + #(combinef (if task (fjjoin task) (combinef)) + (fjjoin t)))) (if task - (#'r/fjinvoke - #(combinef (combinef) (#'r/fjjoin task))) + (fjinvoke + #(combinef (combinef) (fjjoin task))) (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) @@ -680,27 +699,27 @@ [^Statement stmt sql n combinef reducef connectable opts] (if-let [rs (stmt-sql->result-set stmt sql)] (let [rs-map (mapify-result-set rs opts) - chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) + chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) (recur [(realize rs-map)] - (let [t (#'r/fjfork (chunk batch))] + (let [t (fjfork (chunk batch))] (if task - (#'r/fjfork - (#'r/fjtask #(combinef (#'r/fjjoin task) - (#'r/fjjoin t)))) + (fjfork + (fjtask #(combinef (fjjoin task) + (fjjoin t)))) t))) (recur (conj batch (realize rs-map)) task)) (if (seq batch) - (let [t (#'r/fjfork (chunk batch))] - (#'r/fjinvoke - #(combinef (if task (#'r/fjjoin task) (combinef)) - (#'r/fjjoin t)))) + (let [t (fjfork (chunk batch))] + (fjinvoke + #(combinef (if task (fjjoin task) (combinef)) + (fjjoin t)))) (if task - (#'r/fjinvoke - #(combinef (combinef) (#'r/fjjoin task))) + (fjinvoke + #(combinef (combinef) (fjjoin task))) (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))