From 53ee1c53673eb90ee554df3b53285d1f69714878 Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 19:03:57 -0700 Subject: [PATCH] Complete CollFold implementation #125 --- src/next/jdbc/result_set.clj | 65 ++++++++++++++++++++++++++++++------ test/next/jdbc_test.clj | 42 ++++++++++++++++++++--- 2 files changed, 92 insertions(+), 15 deletions(-) diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index ee3683d..fc4c0d3 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -658,16 +658,47 @@ init'))) (f init {:next.jdbc/update-count (.getUpdateCount stmt)}))) +(defn- fold-stmt-sql + "Execute the SQL command on the given `Statement`, attempt to get either + its `ResultSet` or its generated keys (as a `ResultSet`), and fold that + using the supplied batch size, combining function, and reducing function. + + If the statement yields neither a `ResultSet` nor generated keys, produce + a hash map containing `:next.jdbc/update-count` and the number of rows + updated, and fold that as a single element collection." + [^Statement stmt sql n combinef reducef connectable opts] + (if-let [rs (stmt-sql->result-set stmt sql opts)] + (let [rs-map (mapify-result-set rs opts) + chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch))) + realize (fn [row] (datafiable-row row connectable opts))] + (loop [batch [] tasks []] + (if (.next rs) + (if (= n (count batch)) + (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) + (recur (conj batch (realize rs-map)) tasks)) + (#'r/fjinvoke + #(reduce combinef (combinef) + (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) + (extend-protocol p/Executable java.sql.Connection (-execute [this sql-params opts] - (reify clojure.lang.IReduceInit + (reify + clojure.lang.IReduceInit (reduce [_ f init] - (with-open [stmt (prepare/create this - (first sql-params) - (rest sql-params) - opts)] - (reduce-stmt stmt f init opts))) + (with-open [stmt (prepare/create this + (first sql-params) + (rest sql-params) + opts)] + (reduce-stmt stmt f init opts))) + r/CollFold + (coll-fold [_ n combinef reducef] + (with-open [stmt (prepare/create this + (first sql-params) + (rest sql-params) + opts)] + (fold-stmt stmt n combinef reducef this opts))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this sql-params opts] (with-open [stmt (prepare/create this @@ -707,7 +738,7 @@ (first sql-params) (rest sql-params) opts)] - (fold-stmt stmt n combinef reducef this opts))) + (fold-stmt stmt n combinef reducef this opts))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this sql-params opts] (with-open [con (p/get-connection this opts) @@ -736,9 +767,14 @@ ;; keys so we pass a truthy value to at least attempt it if we ;; do not get a ResultSet back from the execute call (-execute [this _ opts] - (reify clojure.lang.IReduceInit + (reify + clojure.lang.IReduceInit (reduce [_ f init] - (reduce-stmt this f init (assoc opts :return-keys true))) + (reduce-stmt this f init (assoc opts :return-keys true))) + r/CollFold + (coll-fold [_ n combinef reducef] + (fold-stmt this n combinef reducef (.getConnection this) + (assoc opts :return-keys true))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this _ opts] (if-let [rs (stmt->result-set this (assoc opts :return-keys true))] @@ -760,9 +796,16 @@ (-execute [this sql-params opts] (assert (= 1 (count sql-params)) "Parameters cannot be provided when executing a non-prepared Statement") - (reify clojure.lang.IReduceInit + (reify + clojure.lang.IReduceInit (reduce [_ f init] - (reduce-stmt-sql this (first sql-params) f init (assoc opts :return-keys true))) + (reduce-stmt-sql this (first sql-params) f init + (assoc opts :return-keys true))) + r/CollFold + (coll-fold [_ n combinef reducef] + (fold-stmt-sql this (first sql-params) n combinef reducef + (.getConnection this) + (assoc opts :return-keys true))) (toString [_] "`IReduceInit` from `plan` -- missing reduction?"))) (-execute-one [this sql-params opts] (assert (= 1 (count sql-params)) diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 1ce0d35..811b5ab 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -294,13 +294,47 @@ VALUES ('Pear', 'green', 49, 47) (is (= 4 (count (jdbc/execute! con ["select * from fruit"])))) (is (= ac (.getAutoCommit con))))))) -(deftest fold-rs-test - (let [ds-opts (jdbc/with-options (ds) (default-options))] - (testing "foldable result set" +(deftest folding-test + (testing "foldable result set" + (testing "from a Connection" + (let [result + (with-open [con (jdbc/get-connection (ds))] + (r/fold 2 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan con ["select * from fruit order by id"] + (default-options)))))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) + (is (= "Orange" (last result))))) + (testing "from a DataSource" (let [result (r/fold 2 r/cat r/append! (r/map (column :FRUIT/NAME) - (jdbc/plan ds-opts ["select * from fruit order by id"])))] + (jdbc/plan (ds) ["select * from fruit order by id"] + (default-options))))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) + (is (= "Orange" (last result))))) + (testing "from a PreparedStatement" + (let [result + (with-open [con (jdbc/get-connection (ds)) + stmt (jdbc/prepare con + ["select * from fruit order by id"] + (default-options))] + (r/fold 2 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan stmt nil (default-options)))))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) + (is (= "Orange" (last result))))) + (testing "from a Statement" + (let [result + (with-open [con (jdbc/get-connection (ds)) + stmt (prep/statement con (default-options))] + (r/fold 2 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan stmt ["select * from fruit order by id"] + (default-options)))))] (is (= 4 (count result))) (is (= "Apple" (first result))) (is (= "Orange" (last result)))))))