Interleave combine with batch reductions #125
This commit is contained in:
parent
68d8f98d26
commit
77ebb31026
3 changed files with 45 additions and 13 deletions
|
|
@ -10,7 +10,7 @@ One of the benefits of reducing over `plan` is that you can stream very large re
|
|||
|
||||
The result of `plan` is also foldable in the [clojure.core.reducers](https://clojure.org/reference/reducers) sense. While you could use `execute!` to produce a vector of fully-realized rows as hash maps and then fold that vector (Clojure's vectors support fork-join parallel reduce-combine), that wouldn't be possible for very large result sets. If you fold the result of `plan`, the result set will be partitioned and processed using fork-join parallel reduce-combine. Unlike reducing over `plan`, each row **is** realized into a Clojure data structure and each batch is forked for reduction as soon as that many rows have been realized. By default, `fold`'s batch size is 512 but you can specify a different value in the 4-arity call. Once the entire result set has been read, the last (partial) batch is forked for reduction and then all of the reduced batches are combined.
|
||||
|
||||
There is no back pressure here so if your reducing function is slow, you may end up with more of the realized result set in memory than your system can cope with. There is also currently no attempt to combine the reduced batches until the entire result set has been processed which may also add to this issue.
|
||||
There is no back pressure here so if your reducing function is slow, you may end up with more of the realized result set in memory than your system can cope with.
|
||||
|
||||
## CLOB & BLOB SQL Types
|
||||
|
||||
|
|
|
|||
|
|
@ -616,14 +616,26 @@
|
|||
(let [rs-map (mapify-result-set rs opts)
|
||||
chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
|
||||
realize (fn [row] (datafiable-row row connectable opts))]
|
||||
(loop [batch [] tasks []]
|
||||
(loop [batch [] task nil]
|
||||
(if (.next rs)
|
||||
(if (= n (count batch))
|
||||
(recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch))))
|
||||
(recur (conj batch (realize rs-map)) tasks))
|
||||
(recur [(realize rs-map)]
|
||||
(let [t (#'r/fjfork (chunk batch))]
|
||||
(if task
|
||||
(#'r/fjfork
|
||||
(#'r/fjtask #(combinef (#'r/fjjoin task)
|
||||
(#'r/fjjoin t))))
|
||||
t)))
|
||||
(recur (conj batch (realize rs-map)) task))
|
||||
(if (seq batch)
|
||||
(let [t (#'r/fjfork (chunk batch))]
|
||||
(#'r/fjinvoke
|
||||
#(r/reduce combinef (combinef)
|
||||
(map #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch)))))))))
|
||||
#(combinef (if task (#'r/fjjoin task) (combinef))
|
||||
(#'r/fjjoin t))))
|
||||
(if task
|
||||
(#'r/fjinvoke
|
||||
#(combinef (combinef) (#'r/fjjoin task)))
|
||||
(combinef))))))
|
||||
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(defn- stmt-sql->result-set
|
||||
|
|
@ -671,14 +683,26 @@
|
|||
(let [rs-map (mapify-result-set rs opts)
|
||||
chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
|
||||
realize (fn [row] (datafiable-row row connectable opts))]
|
||||
(loop [batch [] tasks []]
|
||||
(loop [batch [] task nil]
|
||||
(if (.next rs)
|
||||
(if (= n (count batch))
|
||||
(recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch))))
|
||||
(recur (conj batch (realize rs-map)) tasks))
|
||||
(recur [(realize rs-map)]
|
||||
(let [t (#'r/fjfork (chunk batch))]
|
||||
(if task
|
||||
(#'r/fjfork
|
||||
(#'r/fjtask #(combinef (#'r/fjjoin task)
|
||||
(#'r/fjjoin t))))
|
||||
t)))
|
||||
(recur (conj batch (realize rs-map)) task))
|
||||
(if (seq batch)
|
||||
(let [t (#'r/fjfork (chunk batch))]
|
||||
(#'r/fjinvoke
|
||||
#(r/reduce combinef (combinef)
|
||||
(map #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch)))))))))
|
||||
#(combinef (if task (#'r/fjjoin task) (combinef))
|
||||
(#'r/fjjoin t))))
|
||||
(if task
|
||||
(#'r/fjinvoke
|
||||
#(combinef (combinef) (#'r/fjjoin task)))
|
||||
(combinef))))))
|
||||
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(extend-protocol p/Executable
|
||||
|
|
|
|||
|
|
@ -314,6 +314,14 @@ VALUES ('Pear', 'green', 49, 47)
|
|||
(default-options))))]
|
||||
(is (= 4 (count result)))
|
||||
(is (= "Apple" (first result)))
|
||||
(is (= "Orange" (last result))))
|
||||
(let [result
|
||||
(r/fold 1 r/cat r/append!
|
||||
(r/map (column :FRUIT/NAME)
|
||||
(jdbc/plan (ds) ["select * from fruit order by id"]
|
||||
(default-options))))]
|
||||
(is (= 4 (count result)))
|
||||
(is (= "Apple" (first result)))
|
||||
(is (= "Orange" (last result)))))
|
||||
(testing "from a PreparedStatement"
|
||||
(let [result
|
||||
|
|
|
|||
Loading…
Reference in a new issue