From 99dd44c2babbb87f5a120be8ad10146dc9f86339 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sat, 27 Jun 2020 12:21:02 -0700 Subject: [PATCH 1/4] Import fj private wrappers from clojure.core.reducers --- src/next/jdbc/result_set.clj | 65 +++++++++++++++++++++++------------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index debad39..98dd9da 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -27,7 +27,8 @@ ResultSet ResultSetMetaData Statement SQLException) - (java.util Locale))) + (java.util Locale) + (java.util.concurrent ForkJoinPool ForkJoinTask))) (set! *warn-on-reflection* true) @@ -603,6 +604,24 @@ init'))) (f init {:next.jdbc/update-count (.getUpdateCount stmt)}))) +;; ForkJoinTask wrappers copied in from clojure.core.reducers to avoid +;; relying on private functionality that might possibly change over time + +(defn- fjtask [^Callable f] + (ForkJoinTask/adapt f)) + +(defn- fjinvoke + "For now, this still relies on clojure.core.reducers/pool which is + public but undocumented." + [f] + (if (ForkJoinTask/inForkJoinPool) + (f) + (.invoke ^ForkJoinPool @r/pool ^ForkJoinTask (fjtask f)))) + +(defn- fjfork [task] (.fork ^ForkJoinTask task)) + +(defn- fjjoin [task] (.join ^ForkJoinTask task)) + (defn- fold-stmt "Execute the `PreparedStatement`, attempt to get either its `ResultSet` or its generated keys (as a `ResultSet`), and fold that using the supplied @@ -614,27 +633,27 @@ [^PreparedStatement stmt n combinef reducef connectable opts] (if-let [rs (stmt->result-set stmt opts)] (let [rs-map (mapify-result-set rs opts) - chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) + chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) (recur [(realize rs-map)] - (let [t (#'r/fjfork (chunk batch))] + (let [t (fjfork (chunk batch))] (if task - (#'r/fjfork - (#'r/fjtask #(combinef (#'r/fjjoin task) - (#'r/fjjoin t)))) + (fjfork + (fjtask #(combinef (fjjoin task) + (fjjoin t)))) t))) (recur (conj batch (realize rs-map)) task)) (if (seq batch) - (let [t (#'r/fjfork (chunk batch))] - (#'r/fjinvoke - #(combinef (if task (#'r/fjjoin task) (combinef)) - (#'r/fjjoin t)))) + (let [t (fjfork (chunk batch))] + (fjinvoke + #(combinef (if task (fjjoin task) (combinef)) + (fjjoin t)))) (if task - (#'r/fjinvoke - #(combinef (combinef) (#'r/fjjoin task))) + (fjinvoke + #(combinef (combinef) (fjjoin task))) (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) @@ -680,27 +699,27 @@ [^Statement stmt sql n combinef reducef connectable opts] (if-let [rs (stmt-sql->result-set stmt sql)] (let [rs-map (mapify-result-set rs opts) - chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) + chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) (recur [(realize rs-map)] - (let [t (#'r/fjfork (chunk batch))] + (let [t (fjfork (chunk batch))] (if task - (#'r/fjfork - (#'r/fjtask #(combinef (#'r/fjjoin task) - (#'r/fjjoin t)))) + (fjfork + (fjtask #(combinef (fjjoin task) + (fjjoin t)))) t))) (recur (conj batch (realize rs-map)) task)) (if (seq batch) - (let [t (#'r/fjfork (chunk batch))] - (#'r/fjinvoke - #(combinef (if task (#'r/fjjoin task) (combinef)) - (#'r/fjjoin t)))) + (let [t (fjfork (chunk batch))] + (fjinvoke + #(combinef (if task (fjjoin task) (combinef)) + (fjjoin t)))) (if task - (#'r/fjinvoke - #(combinef (combinef) (#'r/fjjoin task))) + (fjinvoke + #(combinef (combinef) (fjjoin task))) (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) From 7b24e2ff55f3fa355e11418d111fb1440022bebe Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sat, 27 Jun 2020 13:46:38 -0700 Subject: [PATCH 2/4] Extensive foldable tests --- test/next/jdbc_test.clj | 76 +++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 0fba792..2463025 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -17,7 +17,8 @@ (set! *warn-on-reflection* true) -(use-fixtures :once with-test-db) +;; around each test because of the folding tests using 1,000 rows +(use-fixtures :each with-test-db) (specs/instrument) @@ -295,57 +296,58 @@ VALUES ('Pear', 'green', 49, 47) (is (= ac (.getAutoCommit con))))))) (deftest folding-test + (println "=== folding-test setup for" (:dbtype (db))) + (jdbc/execute-one! (ds) ["delete from fruit"]) + (doseq [n (range 1 1001)] + (jdbc/execute-one! (ds) ["insert into fruit(name) values (?)" + (str "Fruit-" n)])) + (println "=== folding-test running for" (:dbtype (db))) (testing "foldable result set" (testing "from a Connection" (let [result (with-open [con (jdbc/get-connection (ds))] - (r/fold 2 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan con ["select * from fruit order by id"] - (default-options)))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result))))) + (r/foldcat + (r/map (column :FRUIT/NAME) + (jdbc/plan con ["select * from fruit order by id"] + (default-options)))))] + (is (= 1000 (count result))) + (is (= "Fruit-1" (first result))) + (is (= "Fruit-1000" (last result))))) (testing "from a DataSource" - (let [result - (r/fold 2 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan (ds) ["select * from fruit order by id"] - (default-options))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result)))) - (let [result - (r/fold 1 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan (ds) ["select * from fruit order by id"] - (default-options))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result))))) + (doseq [n [1 2 3 4 5 100 300 500 700 900 1000 1100]] + (println " === partition size" n) + (testing (str "folding with n = " n) + (let [result + (r/fold n r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan (ds) ["select * from fruit order by id"] + (default-options))))] + (is (= 1000 (count result))) + (is (= "Fruit-1" (first result))) + (is (= "Fruit-1000" (last result))))))) (testing "from a PreparedStatement" (let [result (with-open [con (jdbc/get-connection (ds)) stmt (jdbc/prepare con ["select * from fruit order by id"] (default-options))] - (r/fold 2 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan stmt nil (default-options)))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result))))) + (r/foldcat + (r/map (column :FRUIT/NAME) + (jdbc/plan stmt nil (default-options)))))] + (is (= 1000 (count result))) + (is (= "Fruit-1" (first result))) + (is (= "Fruit-1000" (last result))))) (testing "from a Statement" (let [result (with-open [con (jdbc/get-connection (ds)) stmt (prep/statement con (default-options))] - (r/fold 2 r/cat r/append! - (r/map (column :FRUIT/NAME) - (jdbc/plan stmt ["select * from fruit order by id"] - (default-options)))))] - (is (= 4 (count result))) - (is (= "Apple" (first result))) - (is (= "Orange" (last result))))))) + (r/foldcat + (r/map (column :FRUIT/NAME) + (jdbc/plan stmt ["select * from fruit order by id"] + (default-options)))))] + (is (= 1000 (count result))) + (is (= "Fruit-1" (first result))) + (is (= "Fruit-1000" (last result))))))) (deftest connection-tests (testing "datasource via jdbcUrl" From 90a6476e526450bad7f7ceed538c21f1d2a09300 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sat, 27 Jun 2020 14:09:54 -0700 Subject: [PATCH 3/4] Use execute-batch! to populate 1000 rows for testing foldable --- test/next/jdbc_test.clj | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 2463025..9f87d15 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -298,9 +298,9 @@ VALUES ('Pear', 'green', 49, 47) (deftest folding-test (println "=== folding-test setup for" (:dbtype (db))) (jdbc/execute-one! (ds) ["delete from fruit"]) - (doseq [n (range 1 1001)] - (jdbc/execute-one! (ds) ["insert into fruit(name) values (?)" - (str "Fruit-" n)])) + (with-open [con (jdbc/get-connection (ds)) + ps (jdbc/prepare con ["insert into fruit(name) values (?)"])] + (prep/execute-batch! ps (mapv #(vector (str "Fruit-" %)) (range 1 1001)))) (println "=== folding-test running for" (:dbtype (db))) (testing "foldable result set" (testing "from a Connection" From 53b5619566387775d26ed8a084e9dc021a02d92c Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Sat, 27 Jun 2020 14:35:18 -0700 Subject: [PATCH 4/4] Quieten fold tests now I've confirmed performance and thread usage --- test/next/jdbc_test.clj | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 9f87d15..1f56663 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -296,12 +296,10 @@ VALUES ('Pear', 'green', 49, 47) (is (= ac (.getAutoCommit con))))))) (deftest folding-test - (println "=== folding-test setup for" (:dbtype (db))) (jdbc/execute-one! (ds) ["delete from fruit"]) (with-open [con (jdbc/get-connection (ds)) ps (jdbc/prepare con ["insert into fruit(name) values (?)"])] (prep/execute-batch! ps (mapv #(vector (str "Fruit-" %)) (range 1 1001)))) - (println "=== folding-test running for" (:dbtype (db))) (testing "foldable result set" (testing "from a Connection" (let [result @@ -315,7 +313,6 @@ VALUES ('Pear', 'green', 49, 47) (is (= "Fruit-1000" (last result))))) (testing "from a DataSource" (doseq [n [1 2 3 4 5 100 300 500 700 900 1000 1100]] - (println " === partition size" n) (testing (str "folding with n = " n) (let [result (r/fold n r/cat r/append!