Prototype of #125 -- foldable result sets!
This commit is contained in:
parent
a4a8602af4
commit
0eb183a0a0
2 changed files with 53 additions and 8 deletions
|
|
@ -18,6 +18,7 @@
|
||||||
for implementations of `ReadableColumn` that provide automatic
|
for implementations of `ReadableColumn` that provide automatic
|
||||||
conversion of some SQL data types to Java Time objects."
|
conversion of some SQL data types to Java Time objects."
|
||||||
(:require [clojure.core.protocols :as core-p]
|
(:require [clojure.core.protocols :as core-p]
|
||||||
|
[clojure.core.reducers :as r]
|
||||||
[clojure.datafy :as d]
|
[clojure.datafy :as d]
|
||||||
[next.jdbc.prepare :as prepare]
|
[next.jdbc.prepare :as prepare]
|
||||||
[next.jdbc.protocols :as p])
|
[next.jdbc.protocols :as p])
|
||||||
|
|
@ -602,6 +603,29 @@
|
||||||
init')))
|
init')))
|
||||||
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||||
|
|
||||||
|
(defn- fold-stmt
|
||||||
|
"Execute the `PreparedStatement`, attempt to get either its `ResultSet` or
|
||||||
|
its generated keys (as a `ResultSet`), and fold that using the supplied
|
||||||
|
batch size, combining function, and reducing function.
|
||||||
|
|
||||||
|
If the statement yields neither a `ResultSet` nor generated keys, produce
|
||||||
|
a hash map containing `:next.jdbc/update-count` and the number of rows
|
||||||
|
updated, and fold that as a single element collection."
|
||||||
|
[^PreparedStatement stmt n combinef reducef connectable opts]
|
||||||
|
(if-let [rs (stmt->result-set stmt opts)]
|
||||||
|
(let [rs-map (mapify-result-set rs opts)
|
||||||
|
chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch)))
|
||||||
|
realize (fn [row] (datafiable-row row connectable opts))]
|
||||||
|
(loop [batch [] tasks []]
|
||||||
|
(if (.next rs)
|
||||||
|
(if (= n (count batch))
|
||||||
|
(recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch))))
|
||||||
|
(recur (conj batch (realize rs-map)) tasks))
|
||||||
|
(#'r/fjinvoke
|
||||||
|
#(reduce combinef (combinef)
|
||||||
|
(mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch)))))))))
|
||||||
|
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||||
|
|
||||||
(defn- stmt-sql->result-set
|
(defn- stmt-sql->result-set
|
||||||
"Given a `Statement`, a SQL command, and options, execute it and return a
|
"Given a `Statement`, a SQL command, and options, execute it and return a
|
||||||
`ResultSet` if possible."
|
`ResultSet` if possible."
|
||||||
|
|
@ -667,7 +691,8 @@
|
||||||
|
|
||||||
javax.sql.DataSource
|
javax.sql.DataSource
|
||||||
(-execute [this sql-params opts]
|
(-execute [this sql-params opts]
|
||||||
(reify clojure.lang.IReduceInit
|
(reify
|
||||||
|
clojure.lang.IReduceInit
|
||||||
(reduce [_ f init]
|
(reduce [_ f init]
|
||||||
(with-open [con (p/get-connection this opts)
|
(with-open [con (p/get-connection this opts)
|
||||||
stmt (prepare/create con
|
stmt (prepare/create con
|
||||||
|
|
@ -675,6 +700,14 @@
|
||||||
(rest sql-params)
|
(rest sql-params)
|
||||||
opts)]
|
opts)]
|
||||||
(reduce-stmt stmt f init opts)))
|
(reduce-stmt stmt f init opts)))
|
||||||
|
r/CollFold
|
||||||
|
(coll-fold [_ n combinef reducef]
|
||||||
|
(with-open [con (p/get-connection this opts)
|
||||||
|
stmt (prepare/create con
|
||||||
|
(first sql-params)
|
||||||
|
(rest sql-params)
|
||||||
|
opts)]
|
||||||
|
(fold-stmt stmt n combinef reducef this opts)))
|
||||||
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
|
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
|
||||||
(-execute-one [this sql-params opts]
|
(-execute-one [this sql-params opts]
|
||||||
(with-open [con (p/get-connection this opts)
|
(with-open [con (p/get-connection this opts)
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,8 @@
|
||||||
|
|
||||||
(ns next.jdbc-test
|
(ns next.jdbc-test
|
||||||
"Basic tests for the primary API of `next.jdbc`."
|
"Basic tests for the primary API of `next.jdbc`."
|
||||||
(:require [clojure.string :as str]
|
(:require [clojure.core.reducers :as r]
|
||||||
|
[clojure.string :as str]
|
||||||
[clojure.test :refer [deftest is testing use-fixtures]]
|
[clojure.test :refer [deftest is testing use-fixtures]]
|
||||||
[next.jdbc :as jdbc]
|
[next.jdbc :as jdbc]
|
||||||
[next.jdbc.connection :as c]
|
[next.jdbc.connection :as c]
|
||||||
|
|
@ -293,6 +294,17 @@ VALUES ('Pear', 'green', 49, 47)
|
||||||
(is (= 4 (count (jdbc/execute! con ["select * from fruit"]))))
|
(is (= 4 (count (jdbc/execute! con ["select * from fruit"]))))
|
||||||
(is (= ac (.getAutoCommit con)))))))
|
(is (= ac (.getAutoCommit con)))))))
|
||||||
|
|
||||||
|
(deftest fold-rs-test
|
||||||
|
(let [ds-opts (jdbc/with-options (ds) (default-options))]
|
||||||
|
(testing "foldable result set"
|
||||||
|
(let [result
|
||||||
|
(r/fold 2 r/cat r/append!
|
||||||
|
(r/map (column :FRUIT/NAME)
|
||||||
|
(jdbc/plan ds-opts ["select * from fruit order by id"])))]
|
||||||
|
(is (= 4 (count result)))
|
||||||
|
(is (= "Apple" (first result)))
|
||||||
|
(is (= "Orange" (last result)))))))
|
||||||
|
|
||||||
(deftest connection-tests
|
(deftest connection-tests
|
||||||
(testing "datasource via jdbcUrl"
|
(testing "datasource via jdbcUrl"
|
||||||
(when-not (postgres?)
|
(when-not (postgres?)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue