diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index c2aef20..ee3683d 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -18,6 +18,7 @@ for implementations of `ReadableColumn` that provide automatic conversion of some SQL data types to Java Time objects." (:require [clojure.core.protocols :as core-p] + [clojure.core.reducers :as r] [clojure.datafy :as d] [next.jdbc.prepare :as prepare] [next.jdbc.protocols :as p]) @@ -602,6 +603,29 @@ init'))) (f init {:next.jdbc/update-count (.getUpdateCount stmt)}))) +(defn- fold-stmt + "Execute the `PreparedStatement`, attempt to get either its `ResultSet` or + its generated keys (as a `ResultSet`), and fold that using the supplied + batch size, combining function, and reducing function. + + If the statement yields neither a `ResultSet` nor generated keys, produce + a hash map containing `:next.jdbc/update-count` and the number of rows + updated, and fold that as a single element collection." + [^PreparedStatement stmt n combinef reducef connectable opts] + (if-let [rs (stmt->result-set stmt opts)] + (let [rs-map (mapify-result-set rs opts) + chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch))) + realize (fn [row] (datafiable-row row connectable opts))] + (loop [batch [] tasks []] + (if (.next rs) + (if (= n (count batch)) + (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) + (recur (conj batch (realize rs-map)) tasks)) + (#'r/fjinvoke + #(reduce combinef (combinef) + (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) + (defn- stmt-sql->result-set "Given a `Statement`, a SQL command, and options, execute it and return a `ResultSet` if possible." @@ -667,14 +691,23 @@ javax.sql.DataSource (-execute [this sql-params opts] - (reify clojure.lang.IReduceInit + (reify + clojure.lang.IReduceInit (reduce [_ f init] - (with-open [con (p/get-connection this opts) - stmt (prepare/create con - (first sql-params) - (rest sql-params) - opts)] - (reduce-stmt stmt f init opts))) + (with-open [con (p/get-connection this opts) + stmt (prepare/create con + (first sql-params) + (rest sql-params) + opts)] + (reduce-stmt stmt f init opts))) + r/CollFold + (coll-fold [_ n combinef reducef] + (with-open [con (p/get-connection this opts) + stmt (prepare/create con + (first sql-params) + (rest sql-params) + opts)] + (fold-stmt stmt n combinef reducef this opts))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this sql-params opts] (with-open [con (p/get-connection this opts) diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 5851139..1ce0d35 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -2,7 +2,8 @@ (ns next.jdbc-test "Basic tests for the primary API of `next.jdbc`." - (:require [clojure.string :as str] + (:require [clojure.core.reducers :as r] + [clojure.string :as str] [clojure.test :refer [deftest is testing use-fixtures]] [next.jdbc :as jdbc] [next.jdbc.connection :as c] @@ -293,6 +294,17 @@ VALUES ('Pear', 'green', 49, 47) (is (= 4 (count (jdbc/execute! con ["select * from fruit"])))) (is (= ac (.getAutoCommit con))))))) +(deftest fold-rs-test + (let [ds-opts (jdbc/with-options (ds) (default-options))] + (testing "foldable result set" + (let [result + (r/fold 2 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan ds-opts ["select * from fruit order by id"])))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) + (is (= "Orange" (last result))))))) + (deftest connection-tests (testing "datasource via jdbcUrl" (when-not (postgres?)