From 77ebb31026df26c233d6414d9fe2a7522b4c546e Mon Sep 17 00:00:00 2001 From: Sean Corfield Date: Fri, 26 Jun 2020 22:21:49 -0700 Subject: [PATCH] Interleave combine with batch reductions #125 --- doc/tips-and-tricks.md | 2 +- src/next/jdbc/result_set.clj | 48 +++++++++++++++++++++++++++--------- test/next/jdbc_test.clj | 8 ++++++ 3 files changed, 45 insertions(+), 13 deletions(-) diff --git a/doc/tips-and-tricks.md b/doc/tips-and-tricks.md index bfe64cb..968e20d 100644 --- a/doc/tips-and-tricks.md +++ b/doc/tips-and-tricks.md @@ -10,7 +10,7 @@ One of the benefits of reducing over `plan` is that you can stream very large re The result of `plan` is also foldable in the [clojure.core.reducers](https://clojure.org/reference/reducers) sense. While you could use `execute!` to produce a vector of fully-realized rows as hash maps and then fold that vector (Clojure's vectors support fork-join parallel reduce-combine), that wouldn't be possible for very large result sets. If you fold the result of `plan`, the result set will be partitioned and processed using fork-join parallel reduce-combine. Unlike reducing over `plan`, each row **is** realized into a Clojure data structure and each batch is forked for reduction as soon as that many rows have been realized. By default, `fold`'s batch size is 512 but you can specify a different value in the 4-arity call. Once the entire result set has been read, the last (partial) batch is forked for reduction and then all of the reduced batches are combined. -There is no back pressure here so if your reducing function is slow, you may end up with more of the realized result set in memory than your system can cope with. There is also currently no attempt to combine the reduced batches until the entire result set has been processed which may also add to this issue. +There is no back pressure here so if your reducing function is slow, you may end up with more of the realized result set in memory than your system can cope with. ## CLOB & BLOB SQL Types diff --git a/src/next/jdbc/result_set.clj b/src/next/jdbc/result_set.clj index 4bfcf88..23c4a4d 100644 --- a/src/next/jdbc/result_set.clj +++ b/src/next/jdbc/result_set.clj @@ -616,14 +616,26 @@ (let [rs-map (mapify-result-set rs opts) chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] - (loop [batch [] tasks []] + (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) - (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) - (recur (conj batch (realize rs-map)) tasks)) - (#'r/fjinvoke - #(r/reduce combinef (combinef) - (map #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (recur [(realize rs-map)] + (let [t (#'r/fjfork (chunk batch))] + (if task + (#'r/fjfork + (#'r/fjtask #(combinef (#'r/fjjoin task) + (#'r/fjjoin t)))) + t))) + (recur (conj batch (realize rs-map)) task)) + (if (seq batch) + (let [t (#'r/fjfork (chunk batch))] + (#'r/fjinvoke + #(combinef (if task (#'r/fjjoin task) (combinef)) + (#'r/fjjoin t)))) + (if task + (#'r/fjinvoke + #(combinef (combinef) (#'r/fjjoin task))) + (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (defn- stmt-sql->result-set @@ -671,14 +683,26 @@ (let [rs-map (mapify-result-set rs opts) chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch))) realize (fn [row] (datafiable-row row connectable opts))] - (loop [batch [] tasks []] + (loop [batch [] task nil] (if (.next rs) (if (= n (count batch)) - (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) - (recur (conj batch (realize rs-map)) tasks)) - (#'r/fjinvoke - #(r/reduce combinef (combinef) - (map #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) + (recur [(realize rs-map)] + (let [t (#'r/fjfork (chunk batch))] + (if task + (#'r/fjfork + (#'r/fjtask #(combinef (#'r/fjjoin task) + (#'r/fjjoin t)))) + t))) + (recur (conj batch (realize rs-map)) task)) + (if (seq batch) + (let [t (#'r/fjfork (chunk batch))] + (#'r/fjinvoke + #(combinef (if task (#'r/fjjoin task) (combinef)) + (#'r/fjjoin t)))) + (if task + (#'r/fjinvoke + #(combinef (combinef) (#'r/fjjoin task))) + (combinef)))))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (extend-protocol p/Executable diff --git a/test/next/jdbc_test.clj b/test/next/jdbc_test.clj index 811b5ab..0fba792 100644 --- a/test/next/jdbc_test.clj +++ b/test/next/jdbc_test.clj @@ -314,6 +314,14 @@ VALUES ('Pear', 'green', 49, 47) (default-options))))] (is (= 4 (count result))) (is (= "Apple" (first result))) + (is (= "Orange" (last result)))) + (let [result + (r/fold 1 r/cat r/append! + (r/map (column :FRUIT/NAME) + (jdbc/plan (ds) ["select * from fruit order by id"] + (default-options))))] + (is (= 4 (count result))) + (is (= "Apple" (first result))) (is (= "Orange" (last result))))) (testing "from a PreparedStatement" (let [result