diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index 194a9fe..810b930 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -27,7 +27,8 @@ ResultSet ResultSetMetaData Statement SQLException) - (java.util Locale))) + (java.util Locale) + (java.util.concurrent ForkJoinPool ForkJoinTask))) (set! *warn-on-reflection* true) @@ -623,6 +624,24 @@ init'))) (f init {:next.jdbc/update-count (.getUpdateCount stmt)}))) +;; ForkJoinTask wrappers copied in from clojure.core.reducers to avoid +;; relying on private functionality that might possibly change over time + +(defn- fjtask [^Callable f] + (ForkJoinTask/adapt f)) + +(defn- fjinvoke + "For now, this still relies on clojure.core.reducers/pool which is + public but undocumented." + [f] + (if (ForkJoinTask/inForkJoinPool) + (f) + (.invoke ^ForkJoinPool @r/pool ^ForkJoinTask (fjtask f)))) + +(defn- fjfork [task] (.fork ^ForkJoinTask task)) + +(defn- fjjoin [task] (.join ^ForkJoinTask task)) + (defn- fold-stmt "Execute the `PreparedStatement`, attempt to get either its `ResultSet` or its generated keys (as a `ResultSet`), and fold that using the supplied @@ -634,27 +653,27 @@ [^PreparedStatement stmt n combinef reducef connectable opts] (if-let [rs (stmt->result-set stmt opts)] (let [rs-map (mapify-result-set rs opts) - chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) + chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) (recur [(realize rs-map)] - (let [t (#'r/fjfork (chunk batch))] + (let [t (fjfork (chunk batch))] (if task - (#'r/fjfork - (#'r/fjtask #(combinef (#'r/fjjoin task) - (#'r/fjjoin t)))) + (fjfork + (fjtask #(combinef (fjjoin task) + (fjjoin t)))) t))) (recur (conj batch (realize rs-map)) task)) (if (seq batch) - (let [t (#'r/fjfork (chunk batch))] - (#'r/fjinvoke - #(combinef (if task (#'r/fjjoin task) (combinef)) - (#'r/fjjoin t)))) + (let [t (fjfork (chunk batch))] + (fjinvoke + #(combinef (if task (fjjoin task) (combinef)) + (fjjoin t)))) (if task - (#'r/fjinvoke - #(combinef (combinef) (#'r/fjjoin task))) + (fjinvoke + #(combinef (combinef) (fjjoin task))) (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) @@ -700,27 +719,27 @@ [^Statement stmt sql n combinef reducef connectable opts] (if-let [rs (stmt-sql->result-set stmt sql)] (let [rs-map (mapify-result-set rs opts) - chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) + chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) (recur [(realize rs-map)] - (let [t (#'r/fjfork (chunk batch))] + (let [t (fjfork (chunk batch))] (if task - (#'r/fjfork - (#'r/fjtask #(combinef (#'r/fjjoin task) - (#'r/fjjoin t)))) + (fjfork + (fjtask #(combinef (fjjoin task) + (fjjoin t)))) t))) (recur (conj batch (realize rs-map)) task)) (if (seq batch) - (let [t (#'r/fjfork (chunk batch))] - (#'r/fjinvoke - #(combinef (if task (#'r/fjjoin task) (combinef)) - (#'r/fjjoin t)))) + (let [t (fjfork (chunk batch))] + (fjinvoke + #(combinef (if task (fjjoin task) (combinef)) + (fjjoin t)))) (if task - (#'r/fjinvoke - #(combinef (combinef) (#'r/fjjoin task))) + (fjinvoke + #(combinef (combinef) (fjjoin task))) (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 9e2fd0d..0d9bfcf 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -18,7 +18,8 @@ (set! *warn-on-reflection* true) -(use-fixtures :once with-test-db) +;; around each test because of the folding tests using 1,000 rows +(use-fixtures :each with-test-db) (specs/instrument) @@ -296,57 +297,55 @@ VALUES ('Pear', 'green', 49, 47) (is (= ac (.getAutoCommit con))))))) (deftest folding-test + (jdbc/execute-one! (ds) ["delete from fruit"]) + (with-open [con (jdbc/get-connection (ds)) + ps (jdbc/prepare con ["insert into fruit(name) values (?)"])] + (prep/execute-batch! ps (mapv #(vector (str "Fruit-" %)) (range 1 1001)))) (testing "foldable result set" (testing "from a Connection" (let [result (with-open [con (jdbc/get-connection (ds))] - (r/fold 2 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan con ["select * from fruit order by id"] - (default-options)))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result))))) + (r/foldcat + (r/map (column :FRUIT/NAME) + (jdbc/plan con ["select * from fruit order by id"] + (default-options)))))] + (is (= 1000 (count result))) + (is (= "Fruit-1" (first result))) + (is (= "Fruit-1000" (last result))))) (testing "from a DataSource" - (let [result - (r/fold 2 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan (ds) ["select * from fruit order by id"] - (default-options))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result)))) - (let [result - (r/fold 1 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan (ds) ["select * from fruit order by id"] - (default-options))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result))))) + (doseq [n [1 2 3 4 5 100 300 500 700 900 1000 1100]] + (testing (str "folding with n = " n) + (let [result + (r/fold n r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan (ds) ["select * from fruit order by id"] + (default-options))))] + (is (= 1000 (count result))) + (is (= "Fruit-1" (first result))) + (is (= "Fruit-1000" (last result))))))) (testing "from a PreparedStatement" (let [result (with-open [con (jdbc/get-connection (ds)) stmt (jdbc/prepare con ["select * from fruit order by id"] (default-options))] - (r/fold 2 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan stmt nil (default-options)))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result))))) + (r/foldcat + (r/map (column :FRUIT/NAME) + (jdbc/plan stmt nil (default-options)))))] + (is (= 1000 (count result))) + (is (= "Fruit-1" (first result))) + (is (= "Fruit-1000" (last result))))) (testing "from a Statement" (let [result (with-open [con (jdbc/get-connection (ds)) stmt (prep/statement con (default-options))] - (r/fold 2 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan stmt ["select * from fruit order by id"] - (default-options)))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result))))))) + (r/foldcat + (r/map (column :FRUIT/NAME) + (jdbc/plan stmt ["select * from fruit order by id"] + (default-options)))))] + (is (= 1000 (count result))) + (is (= "Fruit-1" (first result))) + (is (= "Fruit-1000" (last result))))))) (deftest connection-tests (testing "datasource via jdbcUrl"