Complete CollFold implementation #125

This commit is contained in:
Sean Corfield 2020-06-26 19:03:57 -07:00
parent 0eb183a0a0
commit 53ee1c5367
2 changed files with 92 additions and 15 deletions

View file

@ -658,16 +658,47 @@
init')))
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
(defn- fold-stmt-sql
"Execute the SQL command on the given `Statement`, attempt to get either
its `ResultSet` or its generated keys (as a `ResultSet`), and fold that
using the supplied batch size, combining function, and reducing function.
If the statement yields neither a `ResultSet` nor generated keys, produce
a hash map containing `:next.jdbc/update-count` and the number of rows
updated, and fold that as a single element collection."
[^Statement stmt sql n combinef reducef connectable opts]
(if-let [rs (stmt-sql->result-set stmt sql opts)]
(let [rs-map (mapify-result-set rs opts)
chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch)))
realize (fn [row] (datafiable-row row connectable opts))]
(loop [batch [] tasks []]
(if (.next rs)
(if (= n (count batch))
(recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch))))
(recur (conj batch (realize rs-map)) tasks))
(#'r/fjinvoke
#(reduce combinef (combinef)
(mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch)))))))))
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
(extend-protocol p/Executable
java.sql.Connection
(-execute [this sql-params opts]
(reify clojure.lang.IReduceInit
(reify
clojure.lang.IReduceInit
(reduce [_ f init]
(with-open [stmt (prepare/create this
(first sql-params)
(rest sql-params)
opts)]
(reduce-stmt stmt f init opts)))
(with-open [stmt (prepare/create this
(first sql-params)
(rest sql-params)
opts)]
(reduce-stmt stmt f init opts)))
r/CollFold
(coll-fold [_ n combinef reducef]
(with-open [stmt (prepare/create this
(first sql-params)
(rest sql-params)
opts)]
(fold-stmt stmt n combinef reducef this opts)))
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
(-execute-one [this sql-params opts]
(with-open [stmt (prepare/create this
@ -707,7 +738,7 @@
(first sql-params)
(rest sql-params)
opts)]
(fold-stmt stmt n combinef reducef this opts)))
(fold-stmt stmt n combinef reducef this opts)))
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
(-execute-one [this sql-params opts]
(with-open [con (p/get-connection this opts)
@ -736,9 +767,14 @@
;; keys so we pass a truthy value to at least attempt it if we
;; do not get a ResultSet back from the execute call
(-execute [this _ opts]
(reify clojure.lang.IReduceInit
(reify
clojure.lang.IReduceInit
(reduce [_ f init]
(reduce-stmt this f init (assoc opts :return-keys true)))
(reduce-stmt this f init (assoc opts :return-keys true)))
r/CollFold
(coll-fold [_ n combinef reducef]
(fold-stmt this n combinef reducef (.getConnection this)
(assoc opts :return-keys true)))
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
(-execute-one [this _ opts]
(if-let [rs (stmt->result-set this (assoc opts :return-keys true))]
@ -760,9 +796,16 @@
(-execute [this sql-params opts]
(assert (= 1 (count sql-params))
"Parameters cannot be provided when executing a non-prepared Statement")
(reify clojure.lang.IReduceInit
(reify
clojure.lang.IReduceInit
(reduce [_ f init]
(reduce-stmt-sql this (first sql-params) f init (assoc opts :return-keys true)))
(reduce-stmt-sql this (first sql-params) f init
(assoc opts :return-keys true)))
r/CollFold
(coll-fold [_ n combinef reducef]
(fold-stmt-sql this (first sql-params) n combinef reducef
(.getConnection this)
(assoc opts :return-keys true)))
(toString [_] "`IReduceInit` from `plan` -- missing reduction?")))
(-execute-one [this sql-params opts]
(assert (= 1 (count sql-params))

View file

@ -294,13 +294,47 @@ VALUES ('Pear', 'green', 49, 47)
(is (= 4 (count (jdbc/execute! con ["select * from fruit"]))))
(is (= ac (.getAutoCommit con)))))))
(deftest fold-rs-test
(let [ds-opts (jdbc/with-options (ds) (default-options))]
(testing "foldable result set"
(deftest folding-test
(testing "foldable result set"
(testing "from a Connection"
(let [result
(with-open [con (jdbc/get-connection (ds))]
(r/fold 2 r/cat r/append!
(r/map (column :FRUIT/NAME)
(jdbc/plan con ["select * from fruit order by id"]
(default-options)))))]
(is (= 4 (count result)))
(is (= "Apple" (first result)))
(is (= "Orange" (last result)))))
(testing "from a DataSource"
(let [result
(r/fold 2 r/cat r/append!
(r/map (column :FRUIT/NAME)
(jdbc/plan ds-opts ["select * from fruit order by id"])))]
(jdbc/plan (ds) ["select * from fruit order by id"]
(default-options))))]
(is (= 4 (count result)))
(is (= "Apple" (first result)))
(is (= "Orange" (last result)))))
(testing "from a PreparedStatement"
(let [result
(with-open [con (jdbc/get-connection (ds))
stmt (jdbc/prepare con
["select * from fruit order by id"]
(default-options))]
(r/fold 2 r/cat r/append!
(r/map (column :FRUIT/NAME)
(jdbc/plan stmt nil (default-options)))))]
(is (= 4 (count result)))
(is (= "Apple" (first result)))
(is (= "Orange" (last result)))))
(testing "from a Statement"
(let [result
(with-open [con (jdbc/get-connection (ds))
stmt (prep/statement con (default-options))]
(r/fold 2 r/cat r/append!
(r/map (column :FRUIT/NAME)
(jdbc/plan stmt ["select * from fruit order by id"]
(default-options)))))]
(is (= 4 (count result)))
(is (= "Apple" (first result)))
(is (= "Orange" (last result)))))))