Refactor and expose reducible/foldable ResultSet access
This commit is contained in:
parent
65ec940ccc
commit
75763ba86f
2 changed files with 108 additions and 72 deletions
|
|
@ -5,6 +5,8 @@ Only accretive/fixative changes will be made from now on.
|
|||
* 1.1.next in progress
|
||||
* Fix #140 by adding `"duckdb"` to `next.jdbc.connection/dbtypes`.
|
||||
* Change `next.jdbc.types/as-*` functions to use a thunk instead of a vector to convey metadata, so that wrapped values do not get unpacked by HoneySQL.
|
||||
* Refactor reducing and folding code around `ResultSet`, so that `reducible-result-set` and `foldable-result-set` can be exposed for folks who want more control over processing result sets obtained from database metadata.
|
||||
* `datafiable-result-set` can now be called without the `connectable` and/or `opts` arguments; a `nil` connectable now disables foreign key navigation in datafied results (rather than throwing an obscure exception).
|
||||
|
||||
## Stable Builds
|
||||
|
||||
|
|
|
|||
|
|
@ -615,16 +615,28 @@
|
|||
realized, datafiable result set per the `:builder-fn` option passed in.
|
||||
If no `:builder-fn` option is provided, `as-maps` is used as the default.
|
||||
|
||||
The connectable and the options can both be omitted. If connectable is
|
||||
omitted, `nil` is used and no foreign key navigation will be available
|
||||
for any datafied result. If you want to pass a hash map as the connectable,
|
||||
you must also pass an options hash map.
|
||||
|
||||
This can be used to process regular result sets or metadata result sets."
|
||||
[^java.sql.ResultSet rs connectable opts]
|
||||
(let [builder-fn (get opts :builder-fn as-maps)
|
||||
builder (builder-fn rs opts)]
|
||||
(loop [rs' (->rs builder) more? (.next rs)]
|
||||
(if more?
|
||||
(recur (with-row builder rs'
|
||||
(datafiable-row (row-builder builder) connectable opts))
|
||||
(.next rs))
|
||||
(rs! builder rs')))))
|
||||
([rs]
|
||||
(datafiable-result-set rs nil {}))
|
||||
([rs conn-or-opts]
|
||||
(datafiable-result-set
|
||||
rs
|
||||
(if (map? conn-or-opts) nil conn-or-opts)
|
||||
(if (map? conn-or-opts) conn-or-opts {})))
|
||||
([^java.sql.ResultSet rs connectable opts]
|
||||
(let [builder-fn (get opts :builder-fn as-maps)
|
||||
builder (builder-fn rs opts)]
|
||||
(loop [rs' (->rs builder) more? (.next rs)]
|
||||
(if more?
|
||||
(recur (with-row builder rs'
|
||||
(datafiable-row (row-builder builder) connectable opts))
|
||||
(.next rs))
|
||||
(rs! builder rs'))))))
|
||||
|
||||
(defn- stmt->result-set
|
||||
"Given a `PreparedStatement` and options, execute it and return a `ResultSet`
|
||||
|
|
@ -658,6 +670,20 @@
|
|||
(when-not (= -1 n)
|
||||
[{:next.jdbc/update-count n}]))))
|
||||
|
||||
(defn- reduce-result-set
|
||||
"Given a `ResultSet`, a reducing function, an initial value, and options,
|
||||
perform a reduction of the result set. This is used internally by higher
|
||||
level `reduce-*` functions."
|
||||
[^ResultSet rs f init opts]
|
||||
(let [rs-map (mapify-result-set rs opts)]
|
||||
(loop [init' init]
|
||||
(if (.next rs)
|
||||
(let [result (f init' rs-map)]
|
||||
(if (reduced? result)
|
||||
@result
|
||||
(recur result)))
|
||||
init'))))
|
||||
|
||||
(defn- reduce-stmt
|
||||
"Execute the `PreparedStatement`, attempt to get either its `ResultSet` or
|
||||
its generated keys (as a `ResultSet`), and reduce that using the supplied
|
||||
|
|
@ -668,14 +694,7 @@
|
|||
updated, with the supplied function and initial value applied."
|
||||
[^PreparedStatement stmt f init opts]
|
||||
(if-let [rs (stmt->result-set stmt opts)]
|
||||
(let [rs-map (mapify-result-set rs opts)]
|
||||
(loop [init' init]
|
||||
(if (.next rs)
|
||||
(let [result (f init' rs-map)]
|
||||
(if (reduced? result)
|
||||
@result
|
||||
(recur result)))
|
||||
init')))
|
||||
(reduce-result-set rs f init opts)
|
||||
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
;; ForkJoinTask wrappers copied in from clojure.core.reducers to avoid
|
||||
|
|
@ -696,6 +715,72 @@
|
|||
|
||||
(defn- fjjoin [task] (.join ^ForkJoinTask task))
|
||||
|
||||
(defn- fold-result-set
|
||||
"Given a `ResultSet`, a batch size, combining and reducing functions,
|
||||
a connectable, and options, perform a fold of the result set. This is
|
||||
used internally by higher level `fold-*` functions."
|
||||
[^ResultSet rs n combinef reducef connectable opts]
|
||||
(let [rs-map (mapify-result-set rs opts)
|
||||
chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch)))
|
||||
realize (fn [row] (datafiable-row row connectable opts))]
|
||||
(loop [batch [] task nil]
|
||||
(if (.next rs)
|
||||
(if (= n (count batch))
|
||||
(recur [(realize rs-map)]
|
||||
(let [t (fjfork (chunk batch))]
|
||||
(if task
|
||||
(fjfork
|
||||
(fjtask #(combinef (fjjoin task)
|
||||
(fjjoin t))))
|
||||
t)))
|
||||
(recur (conj batch (realize rs-map)) task))
|
||||
(if (seq batch)
|
||||
(let [t (fjfork (chunk batch))]
|
||||
(fjinvoke
|
||||
#(combinef (if task (fjjoin task) (combinef))
|
||||
(fjjoin t))))
|
||||
(if task
|
||||
(fjinvoke
|
||||
#(combinef (combinef) (fjjoin task)))
|
||||
(combinef)))))))
|
||||
|
||||
(defn reducible-result-set
|
||||
"Given a `ResultSet`, return an `IReduceInit` that can be reduced. An
|
||||
options hash map may be provided.
|
||||
|
||||
You are responsible for ensuring the `Connection` for this `ResultSet`
|
||||
remains open until the reduction is complete!"
|
||||
([rs]
|
||||
(reducible-result-set rs {}))
|
||||
([rs opts]
|
||||
(reify
|
||||
clojure.lang.IReduceInit
|
||||
(reduce [_ f init]
|
||||
(reduce-result-set rs f init opts)))))
|
||||
|
||||
(defn foldable-result-set
|
||||
"Given a `ResultSet` and an optional connectable, return an `r/CollFold`
|
||||
that can be folded. An options hash map may be provided.
|
||||
|
||||
You are responsible for ensuring the `Connection` for this `ResultSet`
|
||||
and the connectable both remain open until the fold is complete!
|
||||
|
||||
If the connectable is omitted, no foreign key navigation would be
|
||||
available in any datafied result. If you want to pass a hash map as the
|
||||
connectable, you must also pass an options hash map."
|
||||
([rs]
|
||||
(foldable-result-set rs nil {}))
|
||||
([rs conn-or-opts]
|
||||
(foldable-result-set
|
||||
rs
|
||||
(if (map? conn-or-opts) nil conn-or-opts)
|
||||
(if (map? conn-or-opts) conn-or-opts {})))
|
||||
([rs connectable opts]
|
||||
(reify
|
||||
r/CollFold
|
||||
(coll-fold [_ n combinef reducef]
|
||||
(fold-result-set rs n combinef reducef connectable opts)))))
|
||||
|
||||
(defn- fold-stmt
|
||||
"Execute the `PreparedStatement`, attempt to get either its `ResultSet` or
|
||||
its generated keys (as a `ResultSet`), and fold that using the supplied
|
||||
|
|
@ -706,29 +791,7 @@
|
|||
updated, and fold that as a single element collection."
|
||||
[^PreparedStatement stmt n combinef reducef connectable opts]
|
||||
(if-let [rs (stmt->result-set stmt opts)]
|
||||
(let [rs-map (mapify-result-set rs opts)
|
||||
chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch)))
|
||||
realize (fn [row] (datafiable-row row connectable opts))]
|
||||
(loop [batch [] task nil]
|
||||
(if (.next rs)
|
||||
(if (= n (count batch))
|
||||
(recur [(realize rs-map)]
|
||||
(let [t (fjfork (chunk batch))]
|
||||
(if task
|
||||
(fjfork
|
||||
(fjtask #(combinef (fjjoin task)
|
||||
(fjjoin t))))
|
||||
t)))
|
||||
(recur (conj batch (realize rs-map)) task))
|
||||
(if (seq batch)
|
||||
(let [t (fjfork (chunk batch))]
|
||||
(fjinvoke
|
||||
#(combinef (if task (fjjoin task) (combinef))
|
||||
(fjjoin t))))
|
||||
(if task
|
||||
(fjinvoke
|
||||
#(combinef (combinef) (fjjoin task)))
|
||||
(combinef))))))
|
||||
(fold-result-set rs n combinef reducef connectable opts)
|
||||
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(defn- stmt-sql->result-set
|
||||
|
|
@ -752,14 +815,7 @@
|
|||
updated, with the supplied function and initial value applied."
|
||||
[^Statement stmt sql f init opts]
|
||||
(if-let [rs (stmt-sql->result-set stmt sql)]
|
||||
(let [rs-map (mapify-result-set rs opts)]
|
||||
(loop [init' init]
|
||||
(if (.next rs)
|
||||
(let [result (f init' rs-map)]
|
||||
(if (reduced? result)
|
||||
@result
|
||||
(recur result)))
|
||||
init')))
|
||||
(reduce-result-set rs f init opts)
|
||||
(f init {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(defn- fold-stmt-sql
|
||||
|
|
@ -772,29 +828,7 @@
|
|||
updated, and fold that as a single element collection."
|
||||
[^Statement stmt sql n combinef reducef connectable opts]
|
||||
(if-let [rs (stmt-sql->result-set stmt sql)]
|
||||
(let [rs-map (mapify-result-set rs opts)
|
||||
chunk (fn [batch] (fjtask #(r/reduce reducef (combinef) batch)))
|
||||
realize (fn [row] (datafiable-row row connectable opts))]
|
||||
(loop [batch [] task nil]
|
||||
(if (.next rs)
|
||||
(if (= n (count batch))
|
||||
(recur [(realize rs-map)]
|
||||
(let [t (fjfork (chunk batch))]
|
||||
(if task
|
||||
(fjfork
|
||||
(fjtask #(combinef (fjjoin task)
|
||||
(fjjoin t))))
|
||||
t)))
|
||||
(recur (conj batch (realize rs-map)) task))
|
||||
(if (seq batch)
|
||||
(let [t (fjfork (chunk batch))]
|
||||
(fjinvoke
|
||||
#(combinef (if task (fjjoin task) (combinef))
|
||||
(fjjoin t))))
|
||||
(if task
|
||||
(fjinvoke
|
||||
#(combinef (combinef) (fjjoin task)))
|
||||
(combinef))))))
|
||||
(fold-result-set rs n combinef reducef connectable opts)
|
||||
(reducef (combinef) {:next.jdbc/update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(extend-protocol p/Executable
|
||||
|
|
@ -1040,7 +1074,7 @@
|
|||
(let [[table fk cardinality]
|
||||
(expand-schema k (or (get-in opts [:schema k])
|
||||
(default-schema k)))]
|
||||
(if fk
|
||||
(if (and fk connectable)
|
||||
(let [entity-fn (:table-fn opts identity)
|
||||
exec-fn! (if (= :many cardinality)
|
||||
p/-execute-all
|
||||
|
|
|
|||
Loading…
Reference in a new issue