Clean up Preparable; add Transactable

This commit is contained in:
Sean Corfield 2019-03-30 20:36:53 -07:00
parent 93eb286bd0
commit a44f59a468

View file

@ -2,7 +2,6 @@
(ns next.jdbc (ns next.jdbc
"" ""
(:require [clojure.set :as set])
(:import (java.lang AutoCloseable) (:import (java.lang AutoCloseable)
(java.sql Connection DriverManager (java.sql Connection DriverManager
PreparedStatement PreparedStatement
@ -11,27 +10,6 @@
(javax.sql DataSource) (javax.sql DataSource)
(java.util Properties))) (java.util Properties)))
(comment
"Key areas of interaction:
1a. Making a DataSource -- turn everything connectable into a DataSource
1b. Given a DataSource, we can getConnection()
2. Preparing a Statement -- connection + SQL + params (+ options)
(multiple param groups means addBatch() calls)
3. Execute a (Prepared) Statement to produce a ResultSet (or update count)
(can execute batch of prepared statements and get multiple results)"
"Additional areas:
1. with-db-connection -- given 'something', get a connection, execute the
body, and close the connection (if we opened it).
2. with-db-transaction -- given 'something', get a connection, start a
transaction, execute the body, commit/rollback, and close the connection
(if we opened it else restore connection state)."
"Database metadata can tell us:
0. If get generated keys is supported!
1. If batch updates are supported
2. If save points are supported
3. If various concurrency/holdability/etc options are supported")
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
(defprotocol Sourceable (defprotocol Sourceable
@ -41,16 +19,9 @@
(defprotocol Executable (defprotocol Executable
(-execute ^clojure.lang.IReduceInit [this sql-params opts])) (-execute ^clojure.lang.IReduceInit [this sql-params opts]))
(defprotocol Preparable (defprotocol Preparable
(prepare ^PreparedStatement [this sql-params opts]) (prepare ^PreparedStatement [this sql-params opts]))
(prepare-fn ^PreparedStatement [this sql params factory])) (defprotocol Transactable
(-transact [this body-fn opts]))
(defn execute!
"General SQL execution function.
Returns a reducible that, when reduced, runs the SQL and yields the result."
([stmt] (-execute stmt [] {}))
([connectable sql-params & [opts]]
(-execute connectable sql-params opts)))
(defn set-parameters (defn set-parameters
"" ""
@ -87,8 +58,9 @@
(into-array String return-keys)) (into-array String return-keys))
(defn- pre-prepare* (defn- pre-prepare*
"Given a some options, return a function that will accept a connection and a "Given a some options, return a statement factory -- a function that will
SQL string and parameters, and return a PreparedStatement representing that." accept a connection and a SQL string and parameters, and return a
PreparedStatement representing that."
[{:keys [return-keys result-type concurrency cursors [{:keys [return-keys result-type concurrency cursors
fetch-size max-rows timeout]}] fetch-size max-rows timeout]}]
(cond-> (cond->
@ -147,15 +119,10 @@
(fn [^Connection con ^String sql] (fn [^Connection con ^String sql]
(.setQueryTimeout ^PreparedStatement (f con sql) timeout))))) (.setQueryTimeout ^PreparedStatement (f con sql) timeout)))))
(defn- prepare*
"Given a connection, a SQL statement, its parameters, and some options,
return a PreparedStatement representing that."
[con [sql & params] opts]
(set-parameters ((pre-prepare* opts) con sql) params))
(defn- prepare-fn* (defn- prepare-fn*
"Given a connection, a SQL statement, its parameters, and some options, "Given a connection, a SQL statement, its parameters, and a statement factory,
return a PreparedStatement representing that." return a PreparedStatement representing that."
^PreparedStatement
[con sql params factory] [con sql params factory]
(set-parameters (factory con sql) params)) (set-parameters (factory con sql) params))
@ -167,81 +134,68 @@
:repeatable-read Connection/TRANSACTION_REPEATABLE_READ :repeatable-read Connection/TRANSACTION_REPEATABLE_READ
:serializable Connection/TRANSACTION_SERIALIZABLE}) :serializable Connection/TRANSACTION_SERIALIZABLE})
(def ^:private isolation-kws
"Map transaction isolation constants to our keywords."
(set/map-invert isolation-levels))
(defn get-isolation-level
"Given an actual JDBC connection, return the current transaction
isolation level, if known. Return :unknown if we do not recognize
the isolation level."
[^Connection jdbc]
(isolation-kws (.getTransactionIsolation jdbc) :unknown))
(defn committable! [con commit?]
(when-let [state (:transacted con)]
(reset! state commit?))
con)
(defn transact* (defn transact*
"" ""
[con transacted f opts] [^Connection con f opts]
(let [{:keys [isolation read-only? rollback-only?]} opts (let [{:keys [isolation read-only? rollback-only?]} opts
committable? (not rollback-only?)] old-autocommit (.getAutoCommit con)
(if transacted old-isolation (.getTransactionIsolation con)
;; should check isolation level; maybe implement save points? old-readonly (.isReadOnly con)]
(f con) (io!
(with-open [^AutoCloseable t-con (assoc (get-connection con opts) (when isolation
;; FIXME: not a record/map! (.setTransactionIsolation con (isolation isolation-levels)))
:transacted (atom committable?))] (when read-only?
(let [^Connection jdbc t-con (.setReadOnly con true))
old-autocommit (.getAutoCommit jdbc) (.setAutoCommit con false)
old-isolation (.getTransactionIsolation jdbc) (try
old-readonly (.isReadOnly jdbc)] (let [result (f con)]
(io! (if rollback-only?
(when isolation (.rollback con)
(.setTransactionIsolation jdbc (isolation isolation-levels))) (.commit con))
(when read-only? result)
(.setReadOnly jdbc true)) (catch Throwable t
(.setAutoCommit jdbc false) (try
(.rollback con)
(catch Throwable rb
;; combine both exceptions
(throw (ex-info (str "Rollback failed handling \""
(.getMessage t)
"\"")
{:rollback rb
:handling t}))))
(throw t))
(finally ; tear down
;; the following can throw SQLExceptions but we do not
;; want those to replace any exception currently being
;; handled -- and if the connection got closed, we just
;; want to ignore exceptions here anyway
(try
(.setAutoCommit con old-autocommit)
(catch Exception _))
(when isolation
(try (try
(let [result (f t-con)] (.setTransactionIsolation con old-isolation)
(if @(:transacted t-con) (catch Exception _)))
(.commit jdbc) (when read-only?
(.rollback jdbc)) (try
result) (.setReadOnly con old-readonly)
(catch Throwable t (catch Exception _))))))))
(try
(.rollback jdbc) (extend-protocol Transactable
(catch Throwable rb Connection
;; combine both exceptions (-transact [this body-fn opts]
(throw (ex-info (str "Rollback failed handling \"" (transact* this body-fn opts))
(.getMessage t) DataSource
"\"") (-transact [this body-fn opts]
{:rollback rb (with-open [con (get-connection this opts)]
:handling t})))) (transact* con body-fn opts)))
(throw t)) Object
(finally ; tear down (-transact [this body-fn opts]
(committable! t-con committable?) (-transact (get-datasource this) body-fn opts)))
;; the following can throw SQLExceptions but we do not
;; want those to replace any exception currently being
;; handled -- and if the connection got closed, we just
;; want to ignore exceptions here anyway
(try
(.setAutoCommit jdbc old-autocommit)
(catch Exception _))
(when isolation
(try
(.setTransactionIsolation jdbc old-isolation)
(catch Exception _)))
(when read-only?
(try
(.setReadOnly jdbc old-readonly)
(catch Exception _)))))))))))
(defmacro in-transaction (defmacro in-transaction
[[sym con opts] & body] [[sym connectable opts] & body]
`(transact* ~con (fn [~sym] ~@body) ~opts)) `(-transact ~connectable (fn [~sym] ~@body) ~opts))
(def ^:private classnames (def ^:private classnames
"Map of subprotocols to classnames. dbtype specifies one of these keys. "Map of subprotocols to classnames. dbtype specifies one of these keys.
@ -297,17 +251,6 @@
"sqlserver" ";DATABASENAME=" "sqlserver" ";DATABASENAME="
"oracle:sid" ":"}) "oracle:sid" ":"})
(defn- modify-connection
"Given a database connection and a map of options, update the connection
as specified by the options."
^Connection
[^Connection connection opts]
(when (and connection (contains? opts :auto-commit?))
(.setAutoCommit connection (boolean (:auto-commit? opts))))
(when (and connection (contains? opts :read-only?))
(.setReadOnly connection (boolean (:read-only? opts))))
connection)
(defn- ^Properties as-properties (defn- ^Properties as-properties
"Convert any seq of pairs to a java.utils.Properties instance. "Convert any seq of pairs to a java.utils.Properties instance.
Uses as-sql-name to convert both keys and values into strings." Uses as-sql-name to convert both keys and values into strings."
@ -379,6 +322,18 @@
:username username :username username
:password password))))) :password password)))))
(defn- make-connection
"Given a DataSource and a map of options, get a connection and update it
as specified by the options."
^Connection
[^DataSource datasource opts]
(let [^Connection connection (.getConnection datasource)]
(when (contains? opts :auto-commit?)
(.setAutoCommit connection (boolean (:auto-commit? opts))))
(when (contains? opts :read-only?)
(.setReadOnly connection (boolean (:read-only? opts))))
connection))
(extend-protocol Sourceable (extend-protocol Sourceable
clojure.lang.Associative clojure.lang.Associative
(get-datasource [this] (get-datasource [this]
@ -391,27 +346,16 @@
(extend-protocol Connectable (extend-protocol Connectable
DataSource DataSource
(get-connection [this opts] (-> (.getConnection this) (get-connection [this opts] (make-connection this opts))
(modify-connection opts)))
Object Object
(get-connection [this opts] (get-connection (get-datasource this) opts))) (get-connection [this opts] (get-connection (get-datasource this) opts)))
(extend-protocol Preparable (extend-protocol Preparable
Connection Connection
(prepare [this sql-params opts] (prepare [this sql-params opts]
(prepare* this sql-params opts)) (let [[sql & params] sql-params
(prepare-fn [this sql params factory] factory (pre-prepare* opts)]
(prepare-fn* this sql params factory)) (set-parameters (factory this sql) params))))
DataSource
(prepare [this sql-params opts]
(prepare (.getConnection this) sql-params opts))
(prepare-fn [this sql params factory]
(prepare-fn (.getConnection this) sql params factory))
Object
(prepare [this sql-params opts]
(prepare (get-datasource this) sql-params opts))
(prepare-fn [this sql params factory]
(prepare-fn (get-datasource this) sql params factory)))
(defn- get-column-names (defn- get-column-names
"" ""
@ -430,12 +374,10 @@
Supports ILookup (keywords are treated as strings). Supports ILookup (keywords are treated as strings).
Supports Associative (again, keywords are treated as strings). Supports Associative (again, keywords are treated as strings). If you assoc,
a full row will be realized (via seq/into).
Supports Seqable which realizes a full row of the data. Supports Seqable which realizes a full row of the data."
Later we may realize a new hash map when assoc (and other, future, operations
are performed on the result set row)."
[^ResultSet rs] [^ResultSet rs]
(let [cols (delay (get-column-names rs))] (let [cols (delay (get-column-names rs))]
(reify (reify
@ -493,7 +435,7 @@
(let [factory (pre-prepare* opts)] (let [factory (pre-prepare* opts)]
(reify clojure.lang.IReduceInit (reify clojure.lang.IReduceInit
(reduce [_ f init] (reduce [_ f init]
(with-open [stmt (prepare-fn this sql params factory)] (with-open [stmt (prepare-fn* this sql params factory)]
(reduce-stmt stmt f init)))))) (reduce-stmt stmt f init))))))
DataSource DataSource
(-execute [this [sql & params] opts] (-execute [this [sql & params] opts]
@ -501,7 +443,7 @@
(reify clojure.lang.IReduceInit (reify clojure.lang.IReduceInit
(reduce [_ f init] (reduce [_ f init]
(with-open [con (get-connection this opts)] (with-open [con (get-connection this opts)]
(with-open [stmt (prepare-fn con sql params factory)] (with-open [stmt (prepare-fn* con sql params factory)]
(reduce-stmt stmt f init))))))) (reduce-stmt stmt f init)))))))
PreparedStatement PreparedStatement
(-execute [this _ _] (-execute [this _ _]
@ -511,6 +453,14 @@
(-execute [this sql-params opts] (-execute [this sql-params opts]
(-execute (get-datasource this) sql-params opts))) (-execute (get-datasource this) sql-params opts)))
(defn execute!
"General SQL execution function.
Returns a reducible that, when reduced, runs the SQL and yields the result."
([stmt] (-execute stmt [] {}))
([connectable sql-params & [opts]]
(-execute connectable sql-params opts)))
(defn query (defn query
"" ""
[connectable sql-params & [opts]] [connectable sql-params & [opts]]
@ -609,4 +559,14 @@
;; test assoc works ;; test assoc works
(query con (query con
["select * from fruit where appearance = ?" "red"] ["select * from fruit where appearance = ?" "red"]
{:row-fn #(assoc % :test :value)})) {:row-fn #(assoc % :test :value)})
(in-transaction [t con {:rollback-only? true}]
(command! t ["INSERT INTO fruit (id,name,appearance,cost,grade) VALUES (5,'Pear','green',49,47)"])
(query t ["select * from fruit where name = ?" "Pear"]))
(query con ["select * from fruit where name = ?" "Pear"])
(in-transaction [t con {:rollback-only? false}]
(command! t ["INSERT INTO fruit (id,name,appearance,cost,grade) VALUES (5,'Pear','green',49,47)"])
(query t ["select * from fruit where name = ?" "Pear"]))
(query con ["select * from fruit where name = ?" "Pear"]))