Merge branch 'develop' into multi-rs
This commit is contained in:
commit
c36858ca95
6 changed files with 195 additions and 24 deletions
|
|
@ -3,7 +3,8 @@
|
|||
Only accretive/fixative changes will be made from now on.
|
||||
|
||||
Changes made since the 1.0.478 release:
|
||||
* WIP: support for stored procedures and multiple result sets!
|
||||
* WIP support multiple result sets.
|
||||
* Address #125 by making the result of `plan` foldable (in the `clojure.core.reducers` sense).
|
||||
* Address #124 by extending `next.jdbc.sql.builder/for-query` to support `:top` (SQL Server), `:limit` / `:offset` (MySQL/PostgreSQL), `:offset` / `:fetch` (SQL Standard).
|
||||
* Allow `:all` to be passed into `find-by-keys` instead of an example hash map or a where clause vector so all rows will be returned (expected to be used with `:offset` etc to support simple pagination of an entire table).
|
||||
* Add `:columns` option to `find-by-keys` (and `get-by-id`) to specify a subset of columns to be returned in each row. This can also specify an alias for the column and allows for computed expressions to be selected with an alias.
|
||||
|
|
|
|||
|
|
@ -2,6 +2,16 @@
|
|||
|
||||
This page contains various tips and tricks that make it easier to use `next.jdbc` with a variety of databases. It is mostly organized by database, but there are a few that are cross-database and those are listed first.
|
||||
|
||||
## Reducing and Folding with `plan`
|
||||
|
||||
Most of this documentation describes using `plan` specifically for reducing and notes that you can avoid the overhead of realizing rows from the `ResultSet` into Clojure data structures if your reducing function uses only functions that get column values by name. If you perform any function on the row that would require an actual hash map or a sequence, the row will be realized into a full Clojure hash map via the builder function passed in the options (or via `next.jdbc.result-set/as-maps` by default).
|
||||
|
||||
One of the benefits of reducing over `plan` is that you can stream very large result sets, very efficiently, without having the entire result set in memory (assuming your reducing function doesn't build a data structure that is too large!). See the tips below on **Streaming Result Sets**.
|
||||
|
||||
The result of `plan` is also foldable in the [clojure.core.reducers](https://clojure.org/reference/reducers) sense. While you could use `execute!` to produce a vector of fully-realized rows as hash maps and then fold that vector (Clojure's vectors support fork-join parallel reduce-combine), that wouldn't be possible for very large result sets. If you fold the result of `plan`, the result set will be partitioned and processed using fork-join parallel reduce-combine. Unlike reducing over `plan`, each row **is** realized into a Clojure data structure and each batch is forked for reduction as soon as that many rows have been realized. By default, `fold`'s batch size is 512 but you can specify a different value in the 4-arity call. Once the entire result set has been read, the last (partial) batch is forked for reduction and then all of the reduced batches are combined.
|
||||
|
||||
There is no back pressure here so if your reducing function is slow, you may end up with more of the realized result set in memory than your system can cope with.
|
||||
|
||||
## CLOB & BLOB SQL Types
|
||||
|
||||
Columns declared with the `CLOB` or `BLOB` SQL types are typically rendered into Clojure result sets as database-specific custom types but they should implement `java.sql.Clob` or `java.sql.Blob` (as appropriate). In general, you can only read the data out of those Java objects during the current transaction, which effectively means that you need to do it either inside the reduction (for `plan`) or inside the result set builder (for `execute!` or `execute-one!`). If you always treat these types the same way for all columns across the whole of your application, you could simply extend `next.jdbc.result-set/ReadableColumn` to `java.sql.Clob` (and/or `java.sql.Blob`). Here's an example for reading `CLOB` into a `String`:
|
||||
|
|
|
|||
|
|
@ -176,6 +176,9 @@
|
|||
"General SQL execution function (for working with result sets).
|
||||
|
||||
Returns a reducible that, when reduced, runs the SQL and yields the result.
|
||||
The reducible is also foldable (in the `clojure.core.reducers` sense) but
|
||||
see the **Tips & Tricks** section of the documentation for some important
|
||||
caveats about that.
|
||||
|
||||
Can be called on a `PreparedStatement`, a `Connection`, or something that can
|
||||
produce a `Connection` via a `DataSource`.
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
for implementations of `ReadableColumn` that provide automatic
|
||||
conversion of some SQL data types to Java Time objects."
|
||||
(:require [clojure.core.protocols :as core-p]
|
||||
[clojure.core.reducers :as r]
|
||||
[clojure.datafy :as d]
|
||||
[next.jdbc.prepare :as prepare]
|
||||
[next.jdbc.protocols :as p])
|
||||
|
|
@ -622,6 +623,41 @@
|
|||
init')))
|
||||
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(defn- fold-stmt
|
||||
"Execute the `PreparedStatement`, attempt to get either its `ResultSet` or
|
||||
its generated keys (as a `ResultSet`), and fold that using the supplied
|
||||
batch size, combining function, and reducing function.
|
||||
|
||||
If the statement yields neither a `ResultSet` nor generated keys, produce
|
||||
a hash map containing `:next.jdbc/update-count` and the number of rows
|
||||
updated, and fold that as a single element collection."
|
||||
[^PreparedStatement stmt n combinef reducef connectable opts]
|
||||
(if-let [rs (stmt->result-set stmt opts)]
|
||||
(let [rs-map (mapify-result-set rs opts)
|
||||
chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
|
||||
realize (fn [row] (datafiable-row row connectable opts))]
|
||||
(loop [batch [] task nil]
|
||||
(if (.next rs)
|
||||
(if (= n (count batch))
|
||||
(recur [(realize rs-map)]
|
||||
(let [t (#'r/fjfork (chunk batch))]
|
||||
(if task
|
||||
(#'r/fjfork
|
||||
(#'r/fjtask #(combinef (#'r/fjjoin task)
|
||||
(#'r/fjjoin t))))
|
||||
t)))
|
||||
(recur (conj batch (realize rs-map)) task))
|
||||
(if (seq batch)
|
||||
(let [t (#'r/fjfork (chunk batch))]
|
||||
(#'r/fjinvoke
|
||||
#(combinef (if task (#'r/fjjoin task) (combinef))
|
||||
(#'r/fjjoin t))))
|
||||
(if task
|
||||
(#'r/fjinvoke
|
||||
#(combinef (combinef) (#'r/fjjoin task)))
|
||||
(combinef))))))
|
||||
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(defn- stmt-sql->result-set
|
||||
"Given a `Statement`, a SQL command, execute it and return a
|
||||
`ResultSet` if possible. We always attempt to return keys."
|
||||
|
|
@ -653,16 +689,59 @@
|
|||
init')))
|
||||
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(defn- fold-stmt-sql
|
||||
"Execute the SQL command on the given `Statement`, attempt to get either
|
||||
its `ResultSet` or its generated keys (as a `ResultSet`), and fold that
|
||||
using the supplied batch size, combining function, and reducing function.
|
||||
|
||||
If the statement yields neither a `ResultSet` nor generated keys, produce
|
||||
a hash map containing `:next.jdbc/update-count` and the number of rows
|
||||
updated, and fold that as a single element collection."
|
||||
[^Statement stmt sql n combinef reducef connectable opts]
|
||||
(if-let [rs (stmt-sql->result-set stmt sql opts)]
|
||||
(let [rs-map (mapify-result-set rs opts)
|
||||
chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
|
||||
realize (fn [row] (datafiable-row row connectable opts))]
|
||||
(loop [batch [] task nil]
|
||||
(if (.next rs)
|
||||
(if (= n (count batch))
|
||||
(recur [(realize rs-map)]
|
||||
(let [t (#'r/fjfork (chunk batch))]
|
||||
(if task
|
||||
(#'r/fjfork
|
||||
(#'r/fjtask #(combinef (#'r/fjjoin task)
|
||||
(#'r/fjjoin t))))
|
||||
t)))
|
||||
(recur (conj batch (realize rs-map)) task))
|
||||
(if (seq batch)
|
||||
(let [t (#'r/fjfork (chunk batch))]
|
||||
(#'r/fjinvoke
|
||||
#(combinef (if task (#'r/fjjoin task) (combinef))
|
||||
(#'r/fjjoin t))))
|
||||
(if task
|
||||
(#'r/fjinvoke
|
||||
#(combinef (combinef) (#'r/fjjoin task)))
|
||||
(combinef))))))
|
||||
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(extend-protocol p/Executable
|
||||
java.sql.Connection
|
||||
(-execute [this sql-params opts]
|
||||
(reify clojure.lang.IReduceInit
|
||||
(reify
|
||||
clojure.lang.IReduceInit
|
||||
(reduce [_ f init]
|
||||
(with-open [stmt (prepare/create this
|
||||
(first sql-params)
|
||||
(rest sql-params)
|
||||
opts)]
|
||||
(reduce-stmt stmt f init opts)))
|
||||
r/CollFold
|
||||
(coll-fold [_ n combinef reducef]
|
||||
(with-open [stmt (prepare/create this
|
||||
(first sql-params)
|
||||
(rest sql-params)
|
||||
opts)]
|
||||
(fold-stmt stmt n combinef reducef this opts)))
|
||||
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
|
||||
(-execute-one [this sql-params opts]
|
||||
(with-open [stmt (prepare/create this
|
||||
|
|
@ -691,7 +770,8 @@
|
|||
|
||||
javax.sql.DataSource
|
||||
(-execute [this sql-params opts]
|
||||
(reify clojure.lang.IReduceInit
|
||||
(reify
|
||||
clojure.lang.IReduceInit
|
||||
(reduce [_ f init]
|
||||
(with-open [con (p/get-connection this opts)
|
||||
stmt (prepare/create con
|
||||
|
|
@ -699,6 +779,14 @@
|
|||
(rest sql-params)
|
||||
opts)]
|
||||
(reduce-stmt stmt f init opts)))
|
||||
r/CollFold
|
||||
(coll-fold [_ n combinef reducef]
|
||||
(with-open [con (p/get-connection this opts)
|
||||
stmt (prepare/create con
|
||||
(first sql-params)
|
||||
(rest sql-params)
|
||||
opts)]
|
||||
(fold-stmt stmt n combinef reducef this opts)))
|
||||
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
|
||||
(-execute-one [this sql-params opts]
|
||||
(with-open [con (p/get-connection this opts)
|
||||
|
|
@ -732,9 +820,14 @@
|
|||
;; keys so we pass a truthy value to at least attempt it if we
|
||||
;; do not get a ResultSet back from the execute call
|
||||
(-execute [this _ opts]
|
||||
(reify clojure.lang.IReduceInit
|
||||
(reify
|
||||
clojure.lang.IReduceInit
|
||||
(reduce [_ f init]
|
||||
(reduce-stmt this f init (assoc opts :return-keys true)))
|
||||
r/CollFold
|
||||
(coll-fold [_ n combinef reducef]
|
||||
(fold-stmt this n combinef reducef (.getConnection this)
|
||||
(assoc opts :return-keys true)))
|
||||
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
|
||||
(-execute-one [this _ opts]
|
||||
(if-let [rs (stmt->result-set this (assoc opts :return-keys true))]
|
||||
|
|
@ -762,9 +855,16 @@
|
|||
(-execute [this sql-params opts]
|
||||
(assert (= 1 (count sql-params))
|
||||
"Parameters cannot be provided when executing a non-prepared Statement")
|
||||
(reify clojure.lang.IReduceInit
|
||||
(reify
|
||||
clojure.lang.IReduceInit
|
||||
(reduce [_ f init]
|
||||
(reduce-stmt-sql this (first sql-params) f init opts))
|
||||
(reduce-stmt-sql this (first sql-params) f init
|
||||
(assoc opts :return-keys true)))
|
||||
r/CollFold
|
||||
(coll-fold [_ n combinef reducef]
|
||||
(fold-stmt-sql this (first sql-params) n combinef reducef
|
||||
(.getConnection this)
|
||||
(assoc opts :return-keys true)))
|
||||
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
|
||||
(-execute-one [this sql-params opts]
|
||||
(assert (= 1 (count sql-params))
|
||||
|
|
|
|||
|
|
@ -21,9 +21,11 @@
|
|||
(def ^:private test-sqlite {:dbtype "sqlite" :dbname "clojure_test_sqlite"})
|
||||
|
||||
;; this is just a dummy db-spec -- it's handled in with-test-db below
|
||||
(def ^:private test-postgres {:dbtype "embedded-postgres"})
|
||||
(def ^:private test-postgres-map {:dbtype "embedded-postgres"})
|
||||
(def ^:private test-postgres
|
||||
(when-not (System/getenv "NEXT_JDBC_NO_POSTGRES") test-postgres-map))
|
||||
;; it takes a while to spin up so we kick it off at startup
|
||||
(defonce embedded-pg (future (EmbeddedPostgres/start)))
|
||||
(defonce embedded-pg (when test-postgres (future (EmbeddedPostgres/start))))
|
||||
|
||||
(def ^:private test-mysql-map
|
||||
(merge (if (System/getenv "NEXT_JDBC_TEST_MARIADB")
|
||||
|
|
@ -47,7 +49,8 @@
|
|||
(when (System/getenv "NEXT_JDBC_TEST_MSSQL") test-jtds-map))
|
||||
|
||||
(def ^:private test-db-specs
|
||||
(cond-> [test-derby test-h2-mem test-h2 test-hsql test-sqlite test-postgres]
|
||||
(cond-> [test-derby test-h2-mem test-h2 test-hsql test-sqlite]
|
||||
test-postgres (conj test-postgres)
|
||||
test-mysql (conj test-mysql)
|
||||
test-mssql (conj test-mssql test-jtds)))
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,8 @@
|
|||
|
||||
(ns next.jdbc-test
|
||||
"Basic tests for the primary API of `next.jdbc`."
|
||||
(:require [clojure.string :as str]
|
||||
(:require [clojure.core.reducers :as r]
|
||||
[clojure.string :as str]
|
||||
[clojure.test :refer [deftest is testing use-fixtures]]
|
||||
[next.jdbc :as jdbc]
|
||||
[next.jdbc.connection :as c]
|
||||
|
|
@ -294,6 +295,59 @@ VALUES ('Pear', 'green', 49, 47)
|
|||
(is (= 4 (count (jdbc/execute! con ["select * from fruit"]))))
|
||||
(is (= ac (.getAutoCommit con)))))))
|
||||
|
||||
(deftest folding-test
|
||||
(testing "foldable result set"
|
||||
(testing "from a Connection"
|
||||
(let [result
|
||||
(with-open [con (jdbc/get-connection (ds))]
|
||||
(r/fold 2 r/cat r/append!
|
||||
(r/map (column :FRUIT/NAME)
|
||||
(jdbc/plan con ["select * from fruit order by id"]
|
||||
(default-options)))))]
|
||||
(is (= 4 (count result)))
|
||||
(is (= "Apple" (first result)))
|
||||
(is (= "Orange" (last result)))))
|
||||
(testing "from a DataSource"
|
||||
(let [result
|
||||
(r/fold 2 r/cat r/append!
|
||||
(r/map (column :FRUIT/NAME)
|
||||
(jdbc/plan (ds) ["select * from fruit order by id"]
|
||||
(default-options))))]
|
||||
(is (= 4 (count result)))
|
||||
(is (= "Apple" (first result)))
|
||||
(is (= "Orange" (last result))))
|
||||
(let [result
|
||||
(r/fold 1 r/cat r/append!
|
||||
(r/map (column :FRUIT/NAME)
|
||||
(jdbc/plan (ds) ["select * from fruit order by id"]
|
||||
(default-options))))]
|
||||
(is (= 4 (count result)))
|
||||
(is (= "Apple" (first result)))
|
||||
(is (= "Orange" (last result)))))
|
||||
(testing "from a PreparedStatement"
|
||||
(let [result
|
||||
(with-open [con (jdbc/get-connection (ds))
|
||||
stmt (jdbc/prepare con
|
||||
["select * from fruit order by id"]
|
||||
(default-options))]
|
||||
(r/fold 2 r/cat r/append!
|
||||
(r/map (column :FRUIT/NAME)
|
||||
(jdbc/plan stmt nil (default-options)))))]
|
||||
(is (= 4 (count result)))
|
||||
(is (= "Apple" (first result)))
|
||||
(is (= "Orange" (last result)))))
|
||||
(testing "from a Statement"
|
||||
(let [result
|
||||
(with-open [con (jdbc/get-connection (ds))
|
||||
stmt (prep/statement con (default-options))]
|
||||
(r/fold 2 r/cat r/append!
|
||||
(r/map (column :FRUIT/NAME)
|
||||
(jdbc/plan stmt ["select * from fruit order by id"]
|
||||
(default-options)))))]
|
||||
(is (= 4 (count result)))
|
||||
(is (= "Apple" (first result)))
|
||||
(is (= "Orange" (last result)))))))
|
||||
|
||||
(deftest connection-tests
|
||||
(testing "datasource via jdbcUrl"
|
||||
(when-not (postgres?)
|
||||
|
|
|
|||
Loading…
Reference in a new issue