Switch reduce to r/reduce in folding logic #125

This commit is contained in:
Sean Corfield 2020-06-26 19:32:48 -07:00
parent 8085acfcfc
commit 469eb0959a

View file

@ -614,7 +614,7 @@
[^PreparedStatement stmt n combinef reducef connectable opts] [^PreparedStatement stmt n combinef reducef connectable opts]
(if-let [rs (stmt->result-set stmt opts)] (if-let [rs (stmt->result-set stmt opts)]
(let [rs-map (mapify-result-set rs opts) (let [rs-map (mapify-result-set rs opts)
chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch))) chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
realize (fn [row] (datafiable-row row connectable opts))] realize (fn [row] (datafiable-row row connectable opts))]
(loop [batch [] tasks []] (loop [batch [] tasks []]
(if (.next rs) (if (.next rs)
@ -622,7 +622,7 @@
(recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch))))
(recur (conj batch (realize rs-map)) tasks)) (recur (conj batch (realize rs-map)) tasks))
(#'r/fjinvoke (#'r/fjinvoke
#(reduce combinef (combinef) #(r/reduce combinef (combinef)
(mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch)))))))))
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
@ -669,7 +669,7 @@
[^Statement stmt sql n combinef reducef connectable opts] [^Statement stmt sql n combinef reducef connectable opts]
(if-let [rs (stmt-sql->result-set stmt sql opts)] (if-let [rs (stmt-sql->result-set stmt sql opts)]
(let [rs-map (mapify-result-set rs opts) (let [rs-map (mapify-result-set rs opts)
chunk (fn [batch] (#'r/fjtask #(reduce reducef (combinef) batch))) chunk (fn [batch] (#'r/fjtask #(r/reduce reducef (combinef) batch)))
realize (fn [row] (datafiable-row row connectable opts))] realize (fn [row] (datafiable-row row connectable opts))]
(loop [batch [] tasks []] (loop [batch [] tasks []]
(if (.next rs) (if (.next rs)
@ -677,7 +677,7 @@
(recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch)))) (recur [(realize rs-map)] (conj tasks (#'r/fjfork (chunk batch))))
(recur (conj batch (realize rs-map)) tasks)) (recur (conj batch (realize rs-map)) tasks))
(#'r/fjinvoke (#'r/fjinvoke
#(reduce combinef (combinef) #(r/reduce combinef (combinef)
(mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch))))))))) (mapv #'r/fjjoin (conj tasks (#'r/fjfork (chunk batch)))))))))
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)}))) (reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))