Merge branch 'develop' into multi-rs
This commit is contained in:
commit
b86145f47e
2 changed files with 78 additions and 60 deletions
|
|
@ -27,7 +27,8 @@
|
||||||
ResultSet ResultSetMetaData
|
ResultSet ResultSetMetaData
|
||||||
Statement
|
Statement
|
||||||
SQLException)
|
SQLException)
|
||||||
(java.util Locale)))
|
(java.util Locale)
|
||||||
|
(java.util.concurrent ForkJoinPool ForkJoinTask)))
|
||||||
|
|
||||||
(set! *warn-on-reflection* true)
|
(set! *warn-on-reflection* true)
|
||||||
|
|
||||||
|
|
@ -623,6 +624,24 @@
|
||||||
init')))
|
init')))
|
||||||
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||||
|
|
||||||
|
;; ForkJoinTask wrappers copied in from clojure.core.reducers to avoid
|
||||||
|
;; relying on private functionality that might possibly change over time
|
||||||
|
|
||||||
|
(defn- fjtask [^Callable f]
|
||||||
|
(ForkJoinTask/adapt f))
|
||||||
|
|
||||||
|
(defn- fjinvoke
|
||||||
|
"For now, this still relies on clojure.core.reducers/pool which is
|
||||||
|
public but undocumented."
|
||||||
|
[f]
|
||||||
|
(if (ForkJoinTask/inForkJoinPool)
|
||||||
|
(f)
|
||||||
|
(.invoke ^ForkJoinPool @r/pool ^ForkJoinTask (fjtask f))))
|
||||||
|
|
||||||
|
(defn- fjfork [task] (.fork ^ForkJoinTask task))
|
||||||
|
|
||||||
|
(defn- fjjoin [task] (.join ^ForkJoinTask task))
|
||||||
|
|
||||||
(defn- fold-stmt
|
(defn- fold-stmt
|
||||||
"Execute the `PreparedStatement`, attempt to get either its `ResultSet` or
|
"Execute the `PreparedStatement`, attempt to get either its `ResultSet` or
|
||||||
its generated keys (as a `ResultSet`), and fold that using the supplied
|
its generated keys (as a `ResultSet`), and fold that using the supplied
|
||||||
|
|
@ -634,27 +653,27 @@
|
||||||
[^PreparedStatement stmt n combinef reducef connectable opts]
|
[^PreparedStatement stmt n combinef reducef connectable opts]
|
||||||
(if-let [rs (stmt->result-set stmt opts)]
|
(if-let [rs (stmt->result-set stmt opts)]
|
||||||
(let [rs-map (mapify-result-set rs opts)
|
(let [rs-map (mapify-result-set rs opts)
|
||||||
chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
|
chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch)))
|
||||||
realize (fn [row] (datafiable-row row connectable opts))]
|
realize (fn [row] (datafiable-row row connectable opts))]
|
||||||
(loop [batch [] task nil]
|
(loop [batch [] task nil]
|
||||||
(if (.next rs)
|
(if (.next rs)
|
||||||
(if (= n (count batch))
|
(if (= n (count batch))
|
||||||
(recur [(realize rs-map)]
|
(recur [(realize rs-map)]
|
||||||
(let [t (#'r/fjfork (chunk batch))]
|
(let [t (fjfork (chunk batch))]
|
||||||
(if task
|
(if task
|
||||||
(#'r/fjfork
|
(fjfork
|
||||||
(#'r/fjtask #(combinef (#'r/fjjoin task)
|
(fjtask #(combinef (fjjoin task)
|
||||||
(#'r/fjjoin t))))
|
(fjjoin t))))
|
||||||
t)))
|
t)))
|
||||||
(recur (conj batch (realize rs-map)) task))
|
(recur (conj batch (realize rs-map)) task))
|
||||||
(if (seq batch)
|
(if (seq batch)
|
||||||
(let [t (#'r/fjfork (chunk batch))]
|
(let [t (fjfork (chunk batch))]
|
||||||
(#'r/fjinvoke
|
(fjinvoke
|
||||||
#(combinef (if task (#'r/fjjoin task) (combinef))
|
#(combinef (if task (fjjoin task) (combinef))
|
||||||
(#'r/fjjoin t))))
|
(fjjoin t))))
|
||||||
(if task
|
(if task
|
||||||
(#'r/fjinvoke
|
(fjinvoke
|
||||||
#(combinef (combinef) (#'r/fjjoin task)))
|
#(combinef (combinef) (fjjoin task)))
|
||||||
(combinef))))))
|
(combinef))))))
|
||||||
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||||
|
|
||||||
|
|
@ -700,27 +719,27 @@
|
||||||
[^Statement stmt sql n combinef reducef connectable opts]
|
[^Statement stmt sql n combinef reducef connectable opts]
|
||||||
(if-let [rs (stmt-sql->result-set stmt sql)]
|
(if-let [rs (stmt-sql->result-set stmt sql)]
|
||||||
(let [rs-map (mapify-result-set rs opts)
|
(let [rs-map (mapify-result-set rs opts)
|
||||||
chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
|
chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch)))
|
||||||
realize (fn [row] (datafiable-row row connectable opts))]
|
realize (fn [row] (datafiable-row row connectable opts))]
|
||||||
(loop [batch [] task nil]
|
(loop [batch [] task nil]
|
||||||
(if (.next rs)
|
(if (.next rs)
|
||||||
(if (= n (count batch))
|
(if (= n (count batch))
|
||||||
(recur [(realize rs-map)]
|
(recur [(realize rs-map)]
|
||||||
(let [t (#'r/fjfork (chunk batch))]
|
(let [t (fjfork (chunk batch))]
|
||||||
(if task
|
(if task
|
||||||
(#'r/fjfork
|
(fjfork
|
||||||
(#'r/fjtask #(combinef (#'r/fjjoin task)
|
(fjtask #(combinef (fjjoin task)
|
||||||
(#'r/fjjoin t))))
|
(fjjoin t))))
|
||||||
t)))
|
t)))
|
||||||
(recur (conj batch (realize rs-map)) task))
|
(recur (conj batch (realize rs-map)) task))
|
||||||
(if (seq batch)
|
(if (seq batch)
|
||||||
(let [t (#'r/fjfork (chunk batch))]
|
(let [t (fjfork (chunk batch))]
|
||||||
(#'r/fjinvoke
|
(fjinvoke
|
||||||
#(combinef (if task (#'r/fjjoin task) (combinef))
|
#(combinef (if task (fjjoin task) (combinef))
|
||||||
(#'r/fjjoin t))))
|
(fjjoin t))))
|
||||||
(if task
|
(if task
|
||||||
(#'r/fjinvoke
|
(fjinvoke
|
||||||
#(combinef (combinef) (#'r/fjjoin task)))
|
#(combinef (combinef) (fjjoin task)))
|
||||||
(combinef))))))
|
(combinef))))))
|
||||||
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,8 @@
|
||||||
|
|
||||||
(set! *warn-on-reflection* true)
|
(set! *warn-on-reflection* true)
|
||||||
|
|
||||||
(use-fixtures :once with-test-db)
|
;; around each test because of the folding tests using 1,000 rows
|
||||||
|
(use-fixtures :each with-test-db)
|
||||||
|
|
||||||
(specs/instrument)
|
(specs/instrument)
|
||||||
|
|
||||||
|
|
@ -296,57 +297,55 @@ VALUES ('Pear', 'green', 49, 47)
|
||||||
(is (= ac (.getAutoCommit con)))))))
|
(is (= ac (.getAutoCommit con)))))))
|
||||||
|
|
||||||
(deftest folding-test
|
(deftest folding-test
|
||||||
|
(jdbc/execute-one! (ds) ["delete from fruit"])
|
||||||
|
(with-open [con (jdbc/get-connection (ds))
|
||||||
|
ps (jdbc/prepare con ["insert into fruit(name) values (?)"])]
|
||||||
|
(prep/execute-batch! ps (mapv #(vector (str "Fruit-" %)) (range 1 1001))))
|
||||||
(testing "foldable result set"
|
(testing "foldable result set"
|
||||||
(testing "from a Connection"
|
(testing "from a Connection"
|
||||||
(let [result
|
(let [result
|
||||||
(with-open [con (jdbc/get-connection (ds))]
|
(with-open [con (jdbc/get-connection (ds))]
|
||||||
(r/fold 2 r/cat r/append!
|
(r/foldcat
|
||||||
(r/map (column :FRUIT/NAME)
|
(r/map (column :FRUIT/NAME)
|
||||||
(jdbc/plan con ["select * from fruit order by id"]
|
(jdbc/plan con ["select * from fruit order by id"]
|
||||||
(default-options)))))]
|
(default-options)))))]
|
||||||
(is (= 4 (count result)))
|
(is (= 1000 (count result)))
|
||||||
(is (= "Apple" (first result)))
|
(is (= "Fruit-1" (first result)))
|
||||||
(is (= "Orange" (last result)))))
|
(is (= "Fruit-1000" (last result)))))
|
||||||
(testing "from a DataSource"
|
(testing "from a DataSource"
|
||||||
|
(doseq [n [1 2 3 4 5 100 300 500 700 900 1000 1100]]
|
||||||
|
(testing (str "folding with n = " n)
|
||||||
(let [result
|
(let [result
|
||||||
(r/fold 2 r/cat r/append!
|
(r/fold n r/cat r/append!
|
||||||
(r/map (column :FRUIT/NAME)
|
(r/map (column :FRUIT/NAME)
|
||||||
(jdbc/plan (ds) ["select * from fruit order by id"]
|
(jdbc/plan (ds) ["select * from fruit order by id"]
|
||||||
(default-options))))]
|
(default-options))))]
|
||||||
(is (= 4 (count result)))
|
(is (= 1000 (count result)))
|
||||||
(is (= "Apple" (first result)))
|
(is (= "Fruit-1" (first result)))
|
||||||
(is (= "Orange" (last result))))
|
(is (= "Fruit-1000" (last result)))))))
|
||||||
(let [result
|
|
||||||
(r/fold 1 r/cat r/append!
|
|
||||||
(r/map (column :FRUIT/NAME)
|
|
||||||
(jdbc/plan (ds) ["select * from fruit order by id"]
|
|
||||||
(default-options))))]
|
|
||||||
(is (= 4 (count result)))
|
|
||||||
(is (= "Apple" (first result)))
|
|
||||||
(is (= "Orange" (last result)))))
|
|
||||||
(testing "from a PreparedStatement"
|
(testing "from a PreparedStatement"
|
||||||
(let [result
|
(let [result
|
||||||
(with-open [con (jdbc/get-connection (ds))
|
(with-open [con (jdbc/get-connection (ds))
|
||||||
stmt (jdbc/prepare con
|
stmt (jdbc/prepare con
|
||||||
["select * from fruit order by id"]
|
["select * from fruit order by id"]
|
||||||
(default-options))]
|
(default-options))]
|
||||||
(r/fold 2 r/cat r/append!
|
(r/foldcat
|
||||||
(r/map (column :FRUIT/NAME)
|
(r/map (column :FRUIT/NAME)
|
||||||
(jdbc/plan stmt nil (default-options)))))]
|
(jdbc/plan stmt nil (default-options)))))]
|
||||||
(is (= 4 (count result)))
|
(is (= 1000 (count result)))
|
||||||
(is (= "Apple" (first result)))
|
(is (= "Fruit-1" (first result)))
|
||||||
(is (= "Orange" (last result)))))
|
(is (= "Fruit-1000" (last result)))))
|
||||||
(testing "from a Statement"
|
(testing "from a Statement"
|
||||||
(let [result
|
(let [result
|
||||||
(with-open [con (jdbc/get-connection (ds))
|
(with-open [con (jdbc/get-connection (ds))
|
||||||
stmt (prep/statement con (default-options))]
|
stmt (prep/statement con (default-options))]
|
||||||
(r/fold 2 r/cat r/append!
|
(r/foldcat
|
||||||
(r/map (column :FRUIT/NAME)
|
(r/map (column :FRUIT/NAME)
|
||||||
(jdbc/plan stmt ["select * from fruit order by id"]
|
(jdbc/plan stmt ["select * from fruit order by id"]
|
||||||
(default-options)))))]
|
(default-options)))))]
|
||||||
(is (= 4 (count result)))
|
(is (= 1000 (count result)))
|
||||||
(is (= "Apple" (first result)))
|
(is (= "Fruit-1" (first result)))
|
||||||
(is (= "Orange" (last result)))))))
|
(is (= "Fruit-1000" (last result)))))))
|
||||||
|
|
||||||
(deftest connection-tests
|
(deftest connection-tests
|
||||||
(testing "datasource via jdbcUrl"
|
(testing "datasource via jdbcUrl"
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue