From 0eb183a0a016a61b5d723a73cad059d657c1aa2a Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 17:38:58 -0700 Subject: [PATCH 1/9] Prototype of #125 -- foldable result sets! --- src/next/jdbc/result_set.clj | 47 ++++++++++++++++++++++++++++++------ test/next/jdbc_test.clj | 14 ++++++++++- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index c2aef20..ee3683d 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -18,6 +18,7 @@ for implementations of `ReadableColumn` that provide automatic conversion of some SQL data types to Java Time objects." (:require [clojure.core.protocols :as core-p] + [clojure.core.reducers :as r] [clojure.datafy :as d] [next.jdbc.prepare :as prepare] [next.jdbc.protocols :as p]) @@ -602,6 +603,29 @@ init'))) (f init {:next.jdbc/update-count (.getUpdateCount stmt)}))) +(defn- fold-stmt + "Execute the `PreparedStatement`, attempt to get either its `ResultSet` or + its generated keys (as a `ResultSet`), and fold that using the supplied + batch size, combining function, and reducing function. + + If the statement yields neither a `ResultSet` nor generated keys, produce + a hash map containing `:next.jdbc/update-count` and the number of rows + updated, and fold that as a single element collection." + [^PreparedStatement stmt n combinef reducef connectable opts] + (if-let [rs (stmt->result-set stmt opts)] + (let [rs-map (mapify-result-set rs opts) + chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch))) + realize (fn [row] (datafiable-row row connectable opts))] + (loop [batch [] tasks []] + (if (.next rs) + (if (= n (count batch)) + (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) + (recur (conj batch (realize rs-map)) tasks)) + (#'r/fjinvoke + #(reduce combinef (combinef) + (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) + (defn- stmt-sql->result-set "Given a `Statement`, a SQL command, and options, execute it and return a `ResultSet` if possible." @@ -667,14 +691,23 @@ javax.sql.DataSource (-execute [this sql-params opts] - (reify clojure.lang.IReduceInit + (reify + clojure.lang.IReduceInit (reduce [_ f init] - (with-open [con (p/get-connection this opts) - stmt (prepare/create con - (first sql-params) - (rest sql-params) - opts)] - (reduce-stmt stmt f init opts))) + (with-open [con (p/get-connection this opts) + stmt (prepare/create con + (first sql-params) + (rest sql-params) + opts)] + (reduce-stmt stmt f init opts))) + r/CollFold + (coll-fold [_ n combinef reducef] + (with-open [con (p/get-connection this opts) + stmt (prepare/create con + (first sql-params) + (rest sql-params) + opts)] + (fold-stmt stmt n combinef reducef this opts))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this sql-params opts] (with-open [con (p/get-connection this opts) diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 5851139..1ce0d35 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -2,7 +2,8 @@ (ns next.jdbc-test "Basic tests for the primary API of `next.jdbc`." - (:require [clojure.string :as str] + (:require [clojure.core.reducers :as r] + [clojure.string :as str] [clojure.test :refer [deftest is testing use-fixtures]] [next.jdbc :as jdbc] [next.jdbc.connection :as c] @@ -293,6 +294,17 @@ VALUES ('Pear', 'green', 49, 47) (is (= 4 (count (jdbc/execute! con ["select * from fruit"])))) (is (= ac (.getAutoCommit con))))))) +(deftest fold-rs-test + (let [ds-opts (jdbc/with-options (ds) (default-options))] + (testing "foldable result set" + (let [result + (r/fold 2 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan ds-opts ["select * from fruit order by id"])))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) + (is (= "Orange" (last result))))))) + (deftest connection-tests (testing "datasource via jdbcUrl" (when-not (postgres?) From 53ee1c53673eb90ee554df3b53285d1f69714878 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 19:03:57 -0700 Subject: [PATCH 2/9] Complete CollFold implementation #125 --- src/next/jdbc/result_set.clj | 65 ++++++++++++++++++++++++++++++------ test/next/jdbc_test.clj | 42 ++++++++++++++++++++--- 2 files changed, 92 insertions(+), 15 deletions(-) diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index ee3683d..fc4c0d3 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -658,16 +658,47 @@ init'))) (f init {:next.jdbc/update-count (.getUpdateCount stmt)}))) +(defn- fold-stmt-sql + "Execute the SQL command on the given `Statement`, attempt to get either + its `ResultSet` or its generated keys (as a `ResultSet`), and fold that + using the supplied batch size, combining function, and reducing function. + + If the statement yields neither a `ResultSet` nor generated keys, produce + a hash map containing `:next.jdbc/update-count` and the number of rows + updated, and fold that as a single element collection." + [^Statement stmt sql n combinef reducef connectable opts] + (if-let [rs (stmt-sql->result-set stmt sql opts)] + (let [rs-map (mapify-result-set rs opts) + chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch))) + realize (fn [row] (datafiable-row row connectable opts))] + (loop [batch [] tasks []] + (if (.next rs) + (if (= n (count batch)) + (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) + (recur (conj batch (realize rs-map)) tasks)) + (#'r/fjinvoke + #(reduce combinef (combinef) + (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) + (extend-protocol p/Executable java.sql.Connection (-execute [this sql-params opts] - (reify clojure.lang.IReduceInit + (reify + clojure.lang.IReduceInit (reduce [_ f init] - (with-open [stmt (prepare/create this - (first sql-params) - (rest sql-params) - opts)] - (reduce-stmt stmt f init opts))) + (with-open [stmt (prepare/create this + (first sql-params) + (rest sql-params) + opts)] + (reduce-stmt stmt f init opts))) + r/CollFold + (coll-fold [_ n combinef reducef] + (with-open [stmt (prepare/create this + (first sql-params) + (rest sql-params) + opts)] + (fold-stmt stmt n combinef reducef this opts))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this sql-params opts] (with-open [stmt (prepare/create this @@ -707,7 +738,7 @@ (first sql-params) (rest sql-params) opts)] - (fold-stmt stmt n combinef reducef this opts))) + (fold-stmt stmt n combinef reducef this opts))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this sql-params opts] (with-open [con (p/get-connection this opts) @@ -736,9 +767,14 @@ ;; keys so we pass a truthy value to at least attempt it if we ;; do not get a ResultSet back from the execute call (-execute [this _ opts] - (reify clojure.lang.IReduceInit + (reify + clojure.lang.IReduceInit (reduce [_ f init] - (reduce-stmt this f init (assoc opts :return-keys true))) + (reduce-stmt this f init (assoc opts :return-keys true))) + r/CollFold + (coll-fold [_ n combinef reducef] + (fold-stmt this n combinef reducef (.getConnection this) + (assoc opts :return-keys true))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this _ opts] (if-let [rs (stmt->result-set this (assoc opts :return-keys true))] @@ -760,9 +796,16 @@ (-execute [this sql-params opts] (assert (= 1 (count sql-params)) "Parameters cannot be provided when executing a non-prepared Statement") - (reify clojure.lang.IReduceInit + (reify + clojure.lang.IReduceInit (reduce [_ f init] - (reduce-stmt-sql this (first sql-params) f init (assoc opts :return-keys true))) + (reduce-stmt-sql this (first sql-params) f init + (assoc opts :return-keys true))) + r/CollFold + (coll-fold [_ n combinef reducef] + (fold-stmt-sql this (first sql-params) n combinef reducef + (.getConnection this) + (assoc opts :return-keys true))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this sql-params opts] (assert (= 1 (count sql-params)) diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 1ce0d35..811b5ab 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -294,13 +294,47 @@ VALUES ('Pear', 'green', 49, 47) (is (= 4 (count (jdbc/execute! con ["select * from fruit"])))) (is (= ac (.getAutoCommit con))))))) -(deftest fold-rs-test - (let [ds-opts (jdbc/with-options (ds) (default-options))] - (testing "foldable result set" +(deftest folding-test + (testing "foldable result set" + (testing "from a Connection" + (let [result + (with-open [con (jdbc/get-connection (ds))] + (r/fold 2 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan con ["select * from fruit order by id"] + (default-options)))))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) + (is (= "Orange" (last result))))) + (testing "from a DataSource" (let [result (r/fold 2 r/cat r/append! (r/map (column :FRUIT/NAME) - (jdbc/plan ds-opts ["select * from fruit order by id"])))] + (jdbc/plan (ds) ["select * from fruit order by id"] + (default-options))))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) + (is (= "Orange" (last result))))) + (testing "from a PreparedStatement" + (let [result + (with-open [con (jdbc/get-connection (ds)) + stmt (jdbc/prepare con + ["select * from fruit order by id"] + (default-options))] + (r/fold 2 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan stmt nil (default-options)))))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) + (is (= "Orange" (last result))))) + (testing "from a Statement" + (let [result + (with-open [con (jdbc/get-connection (ds)) + stmt (prep/statement con (default-options))] + (r/fold 2 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan stmt ["select * from fruit order by id"] + (default-options)))))] (is (= 4 (count result))) (is (= "Apple" (first result))) (is (= "Orange" (last result))))))) From 0cc88f816bcd1c1f2a13382cecf4c9571c9124ba Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 19:13:38 -0700 Subject: [PATCH 3/9] Make PostgreSQL testing suppressable for systems that cannot run EmbeddedPostgreSQL --- test/next/jdbc/test_fixtures.clj | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/test/next/jdbc/test_fixtures.clj b/test/next/jdbc/test_fixtures.clj index 0709a51..04e9a73 100644 --- a/test/next/jdbc/test_fixtures.clj +++ b/test/next/jdbc/test_fixtures.clj @@ -21,9 +21,11 @@ (def ^:private test-sqlite {:dbtype "sqlite" :dbname "clojure_test_sqlite"}) ;; this is just a dummy db-spec -- it's handled in with-test-db below -(def ^:private test-postgres {:dbtype "embedded-postgres"}) +(def ^:private test-postgres-map {:dbtype "embedded-postgres"}) +(def ^:private test-postgres + (when-not (System/getenv "NEXT_JDBC_NO_POSTGRES") test-postgres-map)) ;; it takes a while to spin up so we kick it off at startup -(defonce embedded-pg (future (EmbeddedPostgres/start))) +(defonce embedded-pg (when test-postgres (future (EmbeddedPostgres/start)))) (def ^:private test-mysql-map (merge (if (System/getenv "NEXT_JDBC_TEST_MARIADB") @@ -48,8 +50,9 @@ (def ^:private test-db-specs (cond-> [test-derby test-h2-mem test-h2 test-hsql test-sqlite test-postgres] - test-mysql (conj test-mysql) - test-mssql (conj test-mssql test-jtds))) + test-postgres (conj test-postgres) + test-mysql (conj test-mysql) + test-mssql (conj test-mssql test-jtds))) (def ^:private test-db-spec (atom nil)) From 8085acfcfcd3d0dce09f1e1b3a95062e79d352ec Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 19:17:21 -0700 Subject: [PATCH 4/9] Fix conditional PG testing --- test/next/jdbc/test_fixtures.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/next/jdbc/test_fixtures.clj b/test/next/jdbc/test_fixtures.clj index 04e9a73..5b78806 100644 --- a/test/next/jdbc/test_fixtures.clj +++ b/test/next/jdbc/test_fixtures.clj @@ -49,7 +49,7 @@ (when (System/getenv "NEXT_JDBC_TEST_MSSQL") test-jtds-map)) (def ^:private test-db-specs - (cond-> [test-derby test-h2-mem test-h2 test-hsql test-sqlite test-postgres] + (cond-> [test-derby test-h2-mem test-h2 test-hsql test-sqlite] test-postgres (conj test-postgres) test-mysql (conj test-mysql) test-mssql (conj test-mssql test-jtds))) From 469eb0959ab0a8c7982ee8d4562a63d8f1f8ee49 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 19:32:48 -0700 Subject: [PATCH 5/9] Switch reduce to r/reduce in folding logic #125 --- src/next/jdbc/result_set.clj | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index fc4c0d3..cc0d326 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -614,7 +614,7 @@ [^PreparedStatement stmt n combinef reducef connectable opts] (if-let [rs (stmt->result-set stmt opts)] (let [rs-map (mapify-result-set rs opts) - chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch))) + chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] (loop [batch [] tasks []] (if (.next rs) @@ -622,8 +622,8 @@ (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) (recur (conj batch (realize rs-map)) tasks)) (#'r/fjinvoke - #(reduce combinef (combinef) - (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + #(r/reduce combinef (combinef) + (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (defn- stmt-sql->result-set @@ -669,7 +669,7 @@ [^Statement stmt sql n combinef reducef connectable opts] (if-let [rs (stmt-sql->result-set stmt sql opts)] (let [rs-map (mapify-result-set rs opts) - chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch))) + chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] (loop [batch [] tasks []] (if (.next rs) @@ -677,8 +677,8 @@ (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) (recur (conj batch (realize rs-map)) tasks)) (#'r/fjinvoke - #(reduce combinef (combinef) - (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + #(r/reduce combinef (combinef) + (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (extend-protocol p/Executable From efa37ad84fb0ba0d81164d2e5e5bc01c8291a175 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 21:31:28 -0700 Subject: [PATCH 6/9] Use map for some interleaving of combine and join #125 --- src/next/jdbc/result_set.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index cc0d326..4bfcf88 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -623,7 +623,7 @@ (recur (conj batch (realize rs-map)) tasks)) (#'r/fjinvoke #(r/reduce combinef (combinef) - (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (map #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (defn- stmt-sql->result-set @@ -678,7 +678,7 @@ (recur (conj batch (realize rs-map)) tasks)) (#'r/fjinvoke #(r/reduce combinef (combinef) - (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (map #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (extend-protocol p/Executable From 68d8f98d267416b800abb647e5f30fee66037d82 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 21:31:43 -0700 Subject: [PATCH 7/9] Document folding over plan #125 --- doc/tips-and-tricks.md | 10 ++++++++++ src/next/jdbc.clj | 3 +++ 2 files changed, 13 insertions(+) diff --git a/doc/tips-and-tricks.md b/doc/tips-and-tricks.md index 70635b4..bfe64cb 100644 --- a/doc/tips-and-tricks.md +++ b/doc/tips-and-tricks.md @@ -2,6 +2,16 @@ This page contains various tips and tricks that make it easier to use `next.jdbc` with a variety of databases. It is mostly organized by database, but there are a few that are cross-database and those are listed first. +## Reducing and Folding with `plan` + +Most of this documentation describes using `plan` specifically for reducing and notes that you can avoid the overhead of realizing rows from the `ResultSet` into Clojure data structures if your reducing function uses only functions that get column values by name. If you perform any function on the row that would require an actual hash map or a sequence, the row will be realized into a full Clojure hash map via the builder function passed in the options (or via `next.jdbc.result-set/as-maps` by default). + +One of the benefits of reducing over `plan` is that you can stream very large result sets, very efficiently, without having the entire result set in memory (assuming your reducing function doesn't build a data structure that is too large!). See the tips below on **Streaming Result Sets**. + +The result of `plan` is also foldable in the [clojure.core.reducers](https://clojure.org/reference/reducers) sense. While you could use `execute!` to produce a vector of fully-realized rows as hash maps and then fold that vector (Clojure's vectors support fork-join parallel reduce-combine), that wouldn't be possible for very large result sets. If you fold the result of `plan`, the result set will be partitioned and processed using fork-join parallel reduce-combine. Unlike reducing over `plan`, each row **is** realized into a Clojure data structure and each batch is forked for reduction as soon as that many rows have been realized. By default, `fold`'s batch size is 512 but you can specify a different value in the 4-arity call. Once the entire result set has been read, the last (partial) batch is forked for reduction and then all of the reduced batches are combined. + +There is no back pressure here so if your reducing function is slow, you may end up with more of the realized result set in memory than your system can cope with. There is also currently no attempt to combine the reduced batches until the entire result set has been processed which may also add to this issue. + ## CLOB & BLOB SQL Types Columns declared with the `CLOB` or `BLOB` SQL types are typically rendered into Clojure result sets as database-specific custom types but they should implement `java.sql.Clob` or `java.sql.Blob` (as appropriate). In general, you can only read the data out of those Java objects during the current transaction, which effectively means that you need to do it either inside the reduction (for `plan`) or inside the result set builder (for `execute!` or `execute-one!`). If you always treat these types the same way for all columns across the whole of your application, you could simply extend `next.jdbc.result-set/ReadableColumn` to `java.sql.Clob` (and/or `java.sql.Blob`). Here's an example for reading `CLOB` into a `String`: diff --git a/src/next/jdbc.clj b/src/next/jdbc.clj index f9b9f08..b46283f 100644 --- a/src/next/jdbc.clj +++ b/src/next/jdbc.clj @@ -176,6 +176,9 @@ "General SQL execution function (for working with result sets). Returns a reducible that, when reduced, runs the SQL and yields the result. + The reducible is also foldable (in the `clojure.core.reducers` sense) but + see the **Tips & Tricks** section of the documentation for some important + caveats about that. Can be called on a `PreparedStatement`, a `Connection`, or something that can produce a `Connection` via a `DataSource`. From 77ebb31026df26c233d6414d9fe2a7522b4c546e Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 22:21:49 -0700 Subject: [PATCH 8/9] Interleave combine with batch reductions #125 --- doc/tips-and-tricks.md | 2 +- src/next/jdbc/result_set.clj | 48 +++++++++++++++++++++++++++--------- test/next/jdbc_test.clj | 8 ++++++ 3 files changed, 45 insertions(+), 13 deletions(-) diff --git a/doc/tips-and-tricks.md b/doc/tips-and-tricks.md index bfe64cb..968e20d 100644 --- a/doc/tips-and-tricks.md +++ b/doc/tips-and-tricks.md @@ -10,7 +10,7 @@ One of the benefits of reducing over `plan` is that you can stream very large re The result of `plan` is also foldable in the [clojure.core.reducers](https://clojure.org/reference/reducers) sense. While you could use `execute!` to produce a vector of fully-realized rows as hash maps and then fold that vector (Clojure's vectors support fork-join parallel reduce-combine), that wouldn't be possible for very large result sets. If you fold the result of `plan`, the result set will be partitioned and processed using fork-join parallel reduce-combine. Unlike reducing over `plan`, each row **is** realized into a Clojure data structure and each batch is forked for reduction as soon as that many rows have been realized. By default, `fold`'s batch size is 512 but you can specify a different value in the 4-arity call. Once the entire result set has been read, the last (partial) batch is forked for reduction and then all of the reduced batches are combined. -There is no back pressure here so if your reducing function is slow, you may end up with more of the realized result set in memory than your system can cope with. There is also currently no attempt to combine the reduced batches until the entire result set has been processed which may also add to this issue. +There is no back pressure here so if your reducing function is slow, you may end up with more of the realized result set in memory than your system can cope with. ## CLOB & BLOB SQL Types diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index 4bfcf88..23c4a4d 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -616,14 +616,26 @@ (let [rs-map (mapify-result-set rs opts) chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] - (loop [batch [] tasks []] + (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) - (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) - (recur (conj batch (realize rs-map)) tasks)) - (#'r/fjinvoke - #(r/reduce combinef (combinef) - (map #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (recur [(realize rs-map)] + (let [t (#'r/fjfork (chunk batch))] + (if task + (#'r/fjfork + (#'r/fjtask #(combinef (#'r/fjjoin task) + (#'r/fjjoin t)))) + t))) + (recur (conj batch (realize rs-map)) task)) + (if (seq batch) + (let [t (#'r/fjfork (chunk batch))] + (#'r/fjinvoke + #(combinef (if task (#'r/fjjoin task) (combinef)) + (#'r/fjjoin t)))) + (if task + (#'r/fjinvoke + #(combinef (combinef) (#'r/fjjoin task))) + (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (defn- stmt-sql->result-set @@ -671,14 +683,26 @@ (let [rs-map (mapify-result-set rs opts) chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] - (loop [batch [] tasks []] + (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) - (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) - (recur (conj batch (realize rs-map)) tasks)) - (#'r/fjinvoke - #(r/reduce combinef (combinef) - (map #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (recur [(realize rs-map)] + (let [t (#'r/fjfork (chunk batch))] + (if task + (#'r/fjfork + (#'r/fjtask #(combinef (#'r/fjjoin task) + (#'r/fjjoin t)))) + t))) + (recur (conj batch (realize rs-map)) task)) + (if (seq batch) + (let [t (#'r/fjfork (chunk batch))] + (#'r/fjinvoke + #(combinef (if task (#'r/fjjoin task) (combinef)) + (#'r/fjjoin t)))) + (if task + (#'r/fjinvoke + #(combinef (combinef) (#'r/fjjoin task))) + (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (extend-protocol p/Executable diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 811b5ab..0fba792 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -314,6 +314,14 @@ VALUES ('Pear', 'green', 49, 47) (default-options))))] (is (= 4 (count result))) (is (= "Apple" (first result))) + (is (= "Orange" (last result)))) + (let [result + (r/fold 1 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan (ds) ["select * from fruit order by id"] + (default-options))))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) (is (= "Orange" (last result))))) (testing "from a PreparedStatement" (let [result From d3b51f9cc621dd1435f7817102665f0df05d3476 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 22:28:23 -0700 Subject: [PATCH 9/9] Fixes #125 by supporting fold over plan --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8acda14..448eedc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ Only accretive/fixative changes will be made from now on. Changes made since the 1.0.478 release: +* Address #125 by making the result of `plan` foldable (in the `clojure.core.reducers` sense). * Address #124 by extending `next.jdbc.sql.builder/for-query` to support `:top` (SQL Server), `:limit` / `:offset` (MySQL/PostgreSQL), `:offset` / `:fetch` (SQL Standard). * Allow `:all` to be passed into `find-by-keys` instead of an example hash map or a where clause vector so all rows will be returned (expected to be used with `:offset` etc to support simple pagination of an entire table). * Add `:columns` option to `find-by-keys` (and `get-by-id`) to specify a subset of columns to be returned in each row. This can also specify an alias for the column and allows for computed expressions to be selected with an alias.