Reorganize into multiple namespaces
This commit is contained in:
parent
4d41faa8cd
commit
ac95379bce
6 changed files with 598 additions and 483 deletions
|
|
@ -2,502 +2,49 @@
|
|||
|
||||
(ns next.jdbc
|
||||
""
|
||||
(:import (java.lang AutoCloseable)
|
||||
(java.sql Connection DriverManager
|
||||
PreparedStatement
|
||||
ResultSet ResultSetMetaData
|
||||
SQLException Statement)
|
||||
(javax.sql DataSource)
|
||||
(java.util Properties)))
|
||||
(:require [next.jdbc.connection] ; used to extend protocols
|
||||
[next.jdbc.prepare :as prepare] ; used to extend protocols
|
||||
[next.jdbc.protocols :as p]
|
||||
[next.jdbc.result-set :as rs]
|
||||
[next.jdbc.transaction :as tx])) ; used to extend protocols
|
||||
|
||||
(set! *warn-on-reflection* true)
|
||||
|
||||
(defprotocol Sourceable
|
||||
(get-datasource ^DataSource [this]))
|
||||
(defprotocol Connectable
|
||||
(get-connection ^AutoCloseable [this opts]))
|
||||
(defprotocol Executable
|
||||
(-execute ^clojure.lang.IReduceInit [this sql-params opts]))
|
||||
(defprotocol Preparable
|
||||
(prepare ^PreparedStatement [this sql-params opts]))
|
||||
(defprotocol Transactable
|
||||
(-transact [this body-fn opts]))
|
||||
(defn get-datasource [spec] (p/get-datasource spec))
|
||||
|
||||
(defn set-parameters
|
||||
""
|
||||
^PreparedStatement
|
||||
[^PreparedStatement ps params]
|
||||
(when (seq params)
|
||||
(loop [[p & more] params i 1]
|
||||
(.setObject ps i p)
|
||||
(when more
|
||||
(recur more (inc i)))))
|
||||
ps)
|
||||
(defn get-connection [spec opts] (p/get-connection spec opts))
|
||||
|
||||
(def ^{:private true
|
||||
:doc "Map friendly :concurrency values to ResultSet constants."}
|
||||
result-set-concurrency
|
||||
{:read-only ResultSet/CONCUR_READ_ONLY
|
||||
:updatable ResultSet/CONCUR_UPDATABLE})
|
||||
|
||||
(def ^{:private true
|
||||
:doc "Map friendly :cursors values to ResultSet constants."}
|
||||
result-set-holdability
|
||||
{:hold ResultSet/HOLD_CURSORS_OVER_COMMIT
|
||||
:close ResultSet/CLOSE_CURSORS_AT_COMMIT})
|
||||
|
||||
(def ^{:private true
|
||||
:doc "Map friendly :type values to ResultSet constants."}
|
||||
result-set-type
|
||||
{:forward-only ResultSet/TYPE_FORWARD_ONLY
|
||||
:scroll-insensitive ResultSet/TYPE_SCROLL_INSENSITIVE
|
||||
:scroll-sensitive ResultSet/TYPE_SCROLL_SENSITIVE})
|
||||
|
||||
(defn- ^{:tag (class (into-array String []))} string-array
|
||||
[return-keys]
|
||||
(into-array String return-keys))
|
||||
|
||||
(defn- pre-prepare*
|
||||
"Given a some options, return a statement factory -- a function that will
|
||||
accept a connection and a SQL string and parameters, and return a
|
||||
PreparedStatement representing that."
|
||||
[{:keys [return-keys result-type concurrency cursors
|
||||
fetch-size max-rows timeout]}]
|
||||
(cond->
|
||||
(cond
|
||||
return-keys
|
||||
(do
|
||||
(when (or result-type concurrency cursors)
|
||||
(throw (IllegalArgumentException.
|
||||
(str ":concurrency, :cursors, and :result-type "
|
||||
"may not be specified with :return-keys."))))
|
||||
(if (vector? return-keys)
|
||||
(let [key-names (string-array return-keys)]
|
||||
(fn [^Connection con ^String sql]
|
||||
(try
|
||||
(try
|
||||
(.prepareStatement con sql key-names)
|
||||
(catch Exception _
|
||||
;; assume it is unsupported and try regular generated keys:
|
||||
(.prepareStatement con sql java.sql.Statement/RETURN_GENERATED_KEYS)))
|
||||
(catch Exception _
|
||||
;; assume it is unsupported and try basic PreparedStatement:
|
||||
(.prepareStatement con sql)))))
|
||||
(fn [^Connection con ^String sql]
|
||||
(try
|
||||
(.prepareStatement con sql java.sql.Statement/RETURN_GENERATED_KEYS)
|
||||
(catch Exception _
|
||||
;; assume it is unsupported and try basic PreparedStatement:
|
||||
(.prepareStatement con sql))))))
|
||||
|
||||
(and result-type concurrency)
|
||||
(if cursors
|
||||
(fn [^Connection con ^String sql]
|
||||
(.prepareStatement con sql
|
||||
(get result-set-type result-type result-type)
|
||||
(get result-set-concurrency concurrency concurrency)
|
||||
(get result-set-holdability cursors cursors)))
|
||||
(fn [^Connection con ^String sql]
|
||||
(.prepareStatement con sql
|
||||
(get result-set-type result-type result-type)
|
||||
(get result-set-concurrency concurrency concurrency))))
|
||||
|
||||
(or result-type concurrency cursors)
|
||||
(throw (IllegalArgumentException.
|
||||
(str ":concurrency, :cursors, and :result-type "
|
||||
"may not be specified independently.")))
|
||||
:else
|
||||
(fn [^Connection con ^String sql]
|
||||
(.prepareStatement con sql)))
|
||||
fetch-size (as-> f
|
||||
(fn [^Connection con ^String sql]
|
||||
(.setFetchSize ^PreparedStatement (f con sql) fetch-size)))
|
||||
max-rows (as-> f
|
||||
(fn [^Connection con ^String sql]
|
||||
(.setMaxRows ^PreparedStatement (f con sql) max-rows)))
|
||||
timeout (as-> f
|
||||
(fn [^Connection con ^String sql]
|
||||
(.setQueryTimeout ^PreparedStatement (f con sql) timeout)))))
|
||||
|
||||
(defn- prepare-fn*
|
||||
"Given a connection, a SQL statement, its parameters, and a statement factory,
|
||||
return a PreparedStatement representing that."
|
||||
^PreparedStatement
|
||||
[con sql params factory]
|
||||
(set-parameters (factory con sql) params))
|
||||
|
||||
(def ^:private isolation-levels
|
||||
"Transaction isolation levels."
|
||||
{:none Connection/TRANSACTION_NONE
|
||||
:read-committed Connection/TRANSACTION_READ_COMMITTED
|
||||
:read-uncommitted Connection/TRANSACTION_READ_UNCOMMITTED
|
||||
:repeatable-read Connection/TRANSACTION_REPEATABLE_READ
|
||||
:serializable Connection/TRANSACTION_SERIALIZABLE})
|
||||
|
||||
(defn transact*
|
||||
""
|
||||
[^Connection con f opts]
|
||||
(let [{:keys [isolation read-only? rollback-only?]} opts
|
||||
old-autocommit (.getAutoCommit con)
|
||||
old-isolation (.getTransactionIsolation con)
|
||||
old-readonly (.isReadOnly con)]
|
||||
(io!
|
||||
(when isolation
|
||||
(.setTransactionIsolation con (isolation isolation-levels)))
|
||||
(when read-only?
|
||||
(.setReadOnly con true))
|
||||
(.setAutoCommit con false)
|
||||
(try
|
||||
(let [result (f con)]
|
||||
(if rollback-only?
|
||||
(.rollback con)
|
||||
(.commit con))
|
||||
result)
|
||||
(catch Throwable t
|
||||
(try
|
||||
(.rollback con)
|
||||
(catch Throwable rb
|
||||
;; combine both exceptions
|
||||
(throw (ex-info (str "Rollback failed handling \""
|
||||
(.getMessage t)
|
||||
"\"")
|
||||
{:rollback rb
|
||||
:handling t}))))
|
||||
(throw t))
|
||||
(finally ; tear down
|
||||
;; the following can throw SQLExceptions but we do not
|
||||
;; want those to replace any exception currently being
|
||||
;; handled -- and if the connection got closed, we just
|
||||
;; want to ignore exceptions here anyway
|
||||
(try
|
||||
(.setAutoCommit con old-autocommit)
|
||||
(catch Exception _))
|
||||
(when isolation
|
||||
(try
|
||||
(.setTransactionIsolation con old-isolation)
|
||||
(catch Exception _)))
|
||||
(when read-only?
|
||||
(try
|
||||
(.setReadOnly con old-readonly)
|
||||
(catch Exception _))))))))
|
||||
|
||||
(extend-protocol Transactable
|
||||
Connection
|
||||
(-transact [this body-fn opts]
|
||||
(transact* this body-fn opts))
|
||||
DataSource
|
||||
(-transact [this body-fn opts]
|
||||
(with-open [con (get-connection this opts)]
|
||||
(transact* con body-fn opts)))
|
||||
Object
|
||||
(-transact [this body-fn opts]
|
||||
(-transact (get-datasource this) body-fn opts)))
|
||||
|
||||
(defmacro with-transaction
|
||||
[[sym connectable opts] & body]
|
||||
`(-transact ~connectable (fn [~sym] ~@body) ~opts))
|
||||
|
||||
(def ^:private classnames
|
||||
"Map of subprotocols to classnames. dbtype specifies one of these keys.
|
||||
|
||||
The subprotocols map below provides aliases for dbtype.
|
||||
|
||||
Most databases have just a single class name for their driver but we
|
||||
support a sequence of class names to try in order to allow for drivers
|
||||
that change their names over time (e.g., MySQL)."
|
||||
{"derby" "org.apache.derby.jdbc.EmbeddedDriver"
|
||||
"h2" "org.h2.Driver"
|
||||
"h2:mem" "org.h2.Driver"
|
||||
"hsqldb" "org.hsqldb.jdbcDriver"
|
||||
"jtds:sqlserver" "net.sourceforge.jtds.jdbc.Driver"
|
||||
"mysql" ["com.mysql.cj.jdbc.Driver"
|
||||
"com.mysql.jdbc.Driver"]
|
||||
"oracle:oci" "oracle.jdbc.OracleDriver"
|
||||
"oracle:thin" "oracle.jdbc.OracleDriver"
|
||||
"postgresql" "org.postgresql.Driver"
|
||||
"pgsql" "com.impossibl.postgres.jdbc.PGDriver"
|
||||
"redshift" "com.amazon.redshift.jdbc.Driver"
|
||||
"sqlite" "org.sqlite.JDBC"
|
||||
"sqlserver" "com.microsoft.sqlserver.jdbc.SQLServerDriver"})
|
||||
|
||||
(def ^:private aliases
|
||||
"Map of schemes to subprotocols. Used to provide aliases for dbtype."
|
||||
{"hsql" "hsqldb"
|
||||
"jtds" "jtds:sqlserver"
|
||||
"mssql" "sqlserver"
|
||||
"oracle" "oracle:thin"
|
||||
"oracle:sid" "oracle:thin"
|
||||
"postgres" "postgresql"})
|
||||
|
||||
(def ^:private host-prefixes
|
||||
"Map of subprotocols to non-standard host-prefixes.
|
||||
Anything not listed is assumed to use //."
|
||||
{"oracle:oci" "@"
|
||||
"oracle:thin" "@"})
|
||||
|
||||
(def ^:private ports
|
||||
"Map of subprotocols to ports."
|
||||
{"jtds:sqlserver" 1433
|
||||
"mysql" 3306
|
||||
"oracle:oci" 1521
|
||||
"oracle:sid" 1521
|
||||
"oracle:thin" 1521
|
||||
"postgresql" 5432
|
||||
"sqlserver" 1433})
|
||||
|
||||
(def ^:private dbname-separators
|
||||
"Map of schemes to separators. The default is / but a couple are different."
|
||||
{"mssql" ";DATABASENAME="
|
||||
"sqlserver" ";DATABASENAME="
|
||||
"oracle:sid" ":"})
|
||||
|
||||
(defn- ^Properties as-properties
|
||||
"Convert any seq of pairs to a java.utils.Properties instance.
|
||||
Uses as-sql-name to convert both keys and values into strings."
|
||||
[m]
|
||||
(let [p (Properties.)]
|
||||
(doseq [[k v] m]
|
||||
(.setProperty p (name k) (str v)))
|
||||
p))
|
||||
|
||||
(defn- get-driver-connection
|
||||
"Common logic for loading the DriverManager and the designed JDBC driver
|
||||
class and obtaining the appropriate Connection object."
|
||||
[url etc]
|
||||
;; force DriverManager to be loaded
|
||||
(DriverManager/getLoginTimeout)
|
||||
(DriverManager/getConnection url (as-properties etc)))
|
||||
|
||||
(defn- spec->url+etc
|
||||
""
|
||||
[{:keys [dbtype dbname host port classname] :as db-spec}]
|
||||
(let [;; allow aliases for dbtype
|
||||
subprotocol (aliases dbtype dbtype)
|
||||
host (or host "127.0.0.1")
|
||||
port (or port (ports subprotocol))
|
||||
db-sep (dbname-separators dbtype "/")
|
||||
url (cond (= "h2:mem" dbtype)
|
||||
(str "jdbc:" subprotocol ":" dbname ";DB_CLOSE_DELAY=-1")
|
||||
(#{"derby" "h2" "hsqldb" "sqlite"} subprotocol)
|
||||
(str "jdbc:" subprotocol ":" dbname)
|
||||
:else
|
||||
(str "jdbc:" subprotocol ":"
|
||||
(host-prefixes subprotocol "//")
|
||||
host
|
||||
(when port (str ":" port))
|
||||
db-sep dbname))
|
||||
etc (dissoc db-spec :dbtype :dbname)]
|
||||
;; verify the datasource is loadable
|
||||
(if-let [class-name (or classname (classnames subprotocol))]
|
||||
(do
|
||||
(if (string? class-name)
|
||||
(clojure.lang.RT/loadClassForName class-name)
|
||||
(loop [[clazz & more] class-name]
|
||||
(when-let [load-failure
|
||||
(try
|
||||
(clojure.lang.RT/loadClassForName clazz)
|
||||
nil
|
||||
(catch Exception e
|
||||
e))]
|
||||
(if (seq more)
|
||||
(recur more)
|
||||
(throw load-failure))))))
|
||||
(throw (ex-info (str "Unknown dbtype: " dbtype) db-spec)))
|
||||
[url etc]))
|
||||
|
||||
(defn- string->url+etc
|
||||
""
|
||||
[s]
|
||||
[s {}])
|
||||
|
||||
(defn- url+etc->datasource
|
||||
""
|
||||
[[url etc]]
|
||||
(reify DataSource
|
||||
(getConnection [_]
|
||||
(get-driver-connection url etc))
|
||||
(getConnection [_ username password]
|
||||
(get-driver-connection url
|
||||
(assoc etc
|
||||
:username username
|
||||
:password password)))))
|
||||
|
||||
(defn- make-connection
|
||||
"Given a DataSource and a map of options, get a connection and update it
|
||||
as specified by the options."
|
||||
^Connection
|
||||
[^DataSource datasource opts]
|
||||
(let [^Connection connection (.getConnection datasource)]
|
||||
(when (contains? opts :auto-commit?)
|
||||
(.setAutoCommit connection (boolean (:auto-commit? opts))))
|
||||
(when (contains? opts :read-only?)
|
||||
(.setReadOnly connection (boolean (:read-only? opts))))
|
||||
connection))
|
||||
|
||||
(extend-protocol Sourceable
|
||||
clojure.lang.Associative
|
||||
(get-datasource [this]
|
||||
(url+etc->datasource (spec->url+etc this)))
|
||||
DataSource
|
||||
(get-datasource [this] this)
|
||||
String
|
||||
(get-datasource [this]
|
||||
(url+etc->datasource (string->url+etc this))))
|
||||
|
||||
(extend-protocol Connectable
|
||||
DataSource
|
||||
(get-connection [this opts] (make-connection this opts))
|
||||
Object
|
||||
(get-connection [this opts] (get-connection (get-datasource this) opts)))
|
||||
|
||||
(extend-protocol Preparable
|
||||
Connection
|
||||
(prepare [this sql-params opts]
|
||||
(let [[sql & params] sql-params
|
||||
factory (pre-prepare* opts)]
|
||||
(set-parameters (factory this sql) params))))
|
||||
|
||||
(defn- get-column-names
|
||||
""
|
||||
[^ResultSet rs opts]
|
||||
(let [^ResultSetMetaData rsmeta (.getMetaData rs)
|
||||
idxs (range 1 (inc (.getColumnCount rsmeta)))]
|
||||
(if-let [ident-fn (:identifiers opts)]
|
||||
(mapv (fn [^Integer i]
|
||||
(keyword (when-let [qualifier (not-empty (.getTableName rsmeta i))]
|
||||
(ident-fn qualifier))
|
||||
(ident-fn (.getColumnLabel rsmeta i))))
|
||||
idxs)
|
||||
(mapv (fn [^Integer i]
|
||||
(keyword (not-empty (.getTableName rsmeta i))
|
||||
(.getColumnLabel rsmeta i)))
|
||||
idxs))))
|
||||
|
||||
(defn- mapify-result-set
|
||||
"Given a result set, return an object that wraps the current row as a hash
|
||||
map. Note that a result set is mutable and the current row will change behind
|
||||
this wrapper so operations need to be eager (and fairly limited).
|
||||
|
||||
Supports ILookup (keywords are treated as strings).
|
||||
|
||||
Supports Associative (again, keywords are treated as strings). If you assoc,
|
||||
a full row will be realized (via seq/into).
|
||||
|
||||
Supports Seqable which realizes a full row of the data."
|
||||
[^ResultSet rs opts]
|
||||
(let [cols (delay (get-column-names rs opts))]
|
||||
(reify
|
||||
|
||||
clojure.lang.ILookup
|
||||
(valAt [this k]
|
||||
(try
|
||||
(.getObject rs (name k))
|
||||
(catch SQLException _)))
|
||||
(valAt [this k not-found]
|
||||
(try
|
||||
(.getObject rs (name k))
|
||||
(catch SQLException _
|
||||
not-found)))
|
||||
|
||||
clojure.lang.Associative
|
||||
(containsKey [this k]
|
||||
(try
|
||||
(.getObject rs (name k))
|
||||
true
|
||||
(catch SQLException _
|
||||
false)))
|
||||
(entryAt [this k]
|
||||
(try
|
||||
(clojure.lang.MapEntry. k (.getObject rs (name k)))
|
||||
(catch SQLException _)))
|
||||
(assoc [this k v]
|
||||
(assoc (into {} (seq this)) k v))
|
||||
|
||||
clojure.lang.Seqable
|
||||
(seq [this]
|
||||
(seq (mapv (fn [^Integer i]
|
||||
(clojure.lang.MapEntry. (nth @cols (dec i))
|
||||
(.getObject rs i)))
|
||||
(range 1 (inc (count @cols)))))))))
|
||||
|
||||
(defn- reduce-stmt
|
||||
""
|
||||
[^PreparedStatement stmt f init opts]
|
||||
(if-let [^ResultSet rs (if (.execute stmt)
|
||||
(.getResultSet stmt)
|
||||
(when (:return-keys opts)
|
||||
(try
|
||||
(.getGeneratedKeys stmt)
|
||||
(catch Exception _))))]
|
||||
(let [rs-map (mapify-result-set rs opts)]
|
||||
(loop [init' init]
|
||||
(if (.next rs)
|
||||
(let [result (f init' rs-map)]
|
||||
(if (reduced? result)
|
||||
@result
|
||||
(recur result)))
|
||||
init')))
|
||||
(f init {::update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(extend-protocol Executable
|
||||
Connection
|
||||
(-execute [this sql-params opts]
|
||||
(let [factory (pre-prepare* opts)]
|
||||
(reify clojure.lang.IReduceInit
|
||||
(reduce [_ f init]
|
||||
(with-open [stmt (prepare-fn* this
|
||||
(first sql-params)
|
||||
(rest sql-params)
|
||||
factory)]
|
||||
(reduce-stmt stmt f init opts))))))
|
||||
DataSource
|
||||
(-execute [this sql-params opts]
|
||||
(let [factory (pre-prepare* opts)]
|
||||
(reify clojure.lang.IReduceInit
|
||||
(reduce [_ f init]
|
||||
(with-open [con (get-connection this opts)]
|
||||
(with-open [stmt (prepare-fn* con
|
||||
(first sql-params)
|
||||
(rest sql-params)
|
||||
factory)]
|
||||
(reduce-stmt stmt f init opts)))))))
|
||||
PreparedStatement
|
||||
(-execute [this _ opts]
|
||||
(reify clojure.lang.IReduceInit
|
||||
;; we can't tell if this PreparedStatement will return generated
|
||||
;; keys so we pass a truthy value to at least attempt it if we
|
||||
;; do not get a ResultSet back from the execute call
|
||||
(reduce [_ f init]
|
||||
(reduce-stmt this f init (assoc opts :return-keys true)))))
|
||||
Object
|
||||
(-execute [this sql-params opts]
|
||||
(-execute (get-datasource this) sql-params opts)))
|
||||
(defn prepare [spec sql-params opts] (p/prepare spec sql-params opts))
|
||||
|
||||
(defn reducible!
|
||||
"General SQL execution function.
|
||||
|
||||
Returns a reducible that, when reduced, runs the SQL and yields the result."
|
||||
([stmt] (-execute stmt [] {}))
|
||||
([stmt] (p/-execute stmt [] {}))
|
||||
([connectable sql-params & [opts]]
|
||||
(-execute connectable sql-params opts)))
|
||||
|
||||
(defn- into-map [row] (into {} row))
|
||||
(p/-execute connectable sql-params opts)))
|
||||
|
||||
(defn execute!
|
||||
""
|
||||
([connectable sql-params] (execute! connectable sql-params {}))
|
||||
([stmt]
|
||||
(rs/execute! stmt [] {}))
|
||||
([connectable sql-params]
|
||||
(rs/execute! connectable sql-params {}))
|
||||
([connectable sql-params opts]
|
||||
(into []
|
||||
(map (:row-fn opts into-map))
|
||||
(reducible! connectable sql-params opts))))
|
||||
(rs/execute! connectable sql-params opts)))
|
||||
|
||||
(defn execute-one!
|
||||
""
|
||||
([connectable sql-params] (execute-one! connectable sql-params {}))
|
||||
([stmt]
|
||||
(rs/execute-one! stmt [] {}))
|
||||
([connectable sql-params]
|
||||
(rs/execute-one! connectable sql-params {}))
|
||||
([connectable sql-params opts]
|
||||
(reduce (fn [_ row] (reduced ((:row-fn opts into-map) row)))
|
||||
nil
|
||||
(reducible! connectable sql-params opts))))
|
||||
(rs/execute-one! connectable sql-params opts)))
|
||||
|
||||
(defmacro with-transaction
|
||||
[[sym connectable opts] & body]
|
||||
`(p/-transact ~connectable (fn [~sym] ~@body) ~opts))
|
||||
|
||||
(comment
|
||||
(def db-spec {:dbtype "h2:mem" :dbname "perf"})
|
||||
|
|
@ -511,9 +58,9 @@
|
|||
(execute! con ["DROP TABLE fruit"])
|
||||
;; h2
|
||||
(execute! con ["CREATE TABLE fruit (id int default 0, name varchar(32) primary key, appearance varchar(32), cost int, grade real)"])
|
||||
(execute! con ["INSERT INTO fruit (id,name,appearance,cost,grade) VALUES (1,'Apple','red',59,87), (2,'Banana','yellow',29,92.2), (3,'Peach','fuzzy',139,90.0), (4,'Orange','juicy',89,88.6)"])
|
||||
;; mysql
|
||||
(execute! con ["CREATE TABLE fruit (id int auto_increment, name varchar(32), appearance varchar(32), cost int, grade real, primary key (id))"])
|
||||
(execute! con ["INSERT INTO fruit (id,name,appearance,cost,grade) VALUES (1,'Apple','red',59,87), (2,'Banana','yellow',29,92.2), (3,'Peach','fuzzy',139,90.0), (4,'Orange','juicy',89,88.6)"])
|
||||
(execute! con ["INSERT INTO fruit (id,name,appearance,cost,grade) VALUES (1,'Apple','red',59,87), (2,'Banana','yellow',29,92.2), (3,'Peach','fuzzy',139,90.0), (4,'Orange','juicy',89,88.6)"]
|
||||
{:return-keys true})
|
||||
|
||||
|
|
@ -525,7 +72,7 @@
|
|||
(quick-bench (reduce + (take 10e6 (range))))
|
||||
|
||||
;; raw java
|
||||
(defn select* [^Connection con]
|
||||
(defn select* [^java.sql.Connection con]
|
||||
(let [ps (doto (.prepareStatement con "SELECT * FROM fruit WHERE appearance = ?")
|
||||
(.setObject 1 "red"))
|
||||
rs (.executeQuery ps)
|
||||
|
|
@ -561,17 +108,17 @@
|
|||
(quick-bench
|
||||
[(reduce (fn [_ row] (reduced (:name row)))
|
||||
nil
|
||||
(reducible! (set-parameters ps ["red"])))]))
|
||||
(reducible! (prepare/set-parameters ps ["red"])))]))
|
||||
|
||||
;; this takes more than twice the time of the one above which seems strange
|
||||
(with-open [ps (prepare con ["select * from fruit where appearance = ?"] {})]
|
||||
(quick-bench
|
||||
[(reduce (fn [_ row] (reduced (:name row)))
|
||||
nil
|
||||
(reducible! (set-parameters ps ["red"])))
|
||||
(reducible! (prepare/set-parameters ps ["red"])))
|
||||
(reduce (fn [_ row] (reduced (:name row)))
|
||||
nil
|
||||
(reducible! (set-parameters ps ["fuzzy"])))]))
|
||||
(reducible! (prepare/set-parameters ps ["fuzzy"])))]))
|
||||
|
||||
;; full first row
|
||||
(quick-bench
|
||||
|
|
@ -590,4 +137,6 @@
|
|||
(with-transaction [t con {:rollback-only? true}]
|
||||
(execute! t ["INSERT INTO fruit (id,name,appearance,cost,grade) VALUES (5,'Pear','green',49,47)"])
|
||||
(execute! t ["select * from fruit where name = ?" "Pear"]))
|
||||
(execute! con ["select * from fruit where name = ?" "Pear"]))
|
||||
(execute! con ["select * from fruit where name = ?" "Pear"])
|
||||
|
||||
(execute! con ["select * from membership"]))
|
||||
|
|
|
|||
163
src/next/jdbc/connection.clj
Normal file
163
src/next/jdbc/connection.clj
Normal file
|
|
@ -0,0 +1,163 @@
|
|||
;; copyright (c) 2018-2019 Sean Corfield, all rights reserved
|
||||
|
||||
(ns next.jdbc.connection
|
||||
""
|
||||
(:require [next.jdbc.protocols :as p])
|
||||
(:import (java.sql Connection DriverManager)
|
||||
(javax.sql DataSource)
|
||||
(java.util Properties)))
|
||||
|
||||
(set! *warn-on-reflection* true)
|
||||
|
||||
(def ^:private classnames
|
||||
"Map of subprotocols to classnames. dbtype specifies one of these keys.
|
||||
|
||||
The subprotocols map below provides aliases for dbtype.
|
||||
|
||||
Most databases have just a single class name for their driver but we
|
||||
support a sequence of class names to try in order to allow for drivers
|
||||
that change their names over time (e.g., MySQL)."
|
||||
{"derby" "org.apache.derby.jdbc.EmbeddedDriver"
|
||||
"h2" "org.h2.Driver"
|
||||
"h2:mem" "org.h2.Driver"
|
||||
"hsqldb" "org.hsqldb.jdbcDriver"
|
||||
"jtds:sqlserver" "net.sourceforge.jtds.jdbc.Driver"
|
||||
"mysql" ["com.mysql.cj.jdbc.Driver"
|
||||
"com.mysql.jdbc.Driver"]
|
||||
"oracle:oci" "oracle.jdbc.OracleDriver"
|
||||
"oracle:thin" "oracle.jdbc.OracleDriver"
|
||||
"postgresql" "org.postgresql.Driver"
|
||||
"pgsql" "com.impossibl.postgres.jdbc.PGDriver"
|
||||
"redshift" "com.amazon.redshift.jdbc.Driver"
|
||||
"sqlite" "org.sqlite.JDBC"
|
||||
"sqlserver" "com.microsoft.sqlserver.jdbc.SQLServerDriver"})
|
||||
|
||||
(def ^:private aliases
|
||||
"Map of schemes to subprotocols. Used to provide aliases for dbtype."
|
||||
{"hsql" "hsqldb"
|
||||
"jtds" "jtds:sqlserver"
|
||||
"mssql" "sqlserver"
|
||||
"oracle" "oracle:thin"
|
||||
"oracle:sid" "oracle:thin"
|
||||
"postgres" "postgresql"})
|
||||
|
||||
(def ^:private host-prefixes
|
||||
"Map of subprotocols to non-standard host-prefixes.
|
||||
Anything not listed is assumed to use //."
|
||||
{"oracle:oci" "@"
|
||||
"oracle:thin" "@"})
|
||||
|
||||
(def ^:private ports
|
||||
"Map of subprotocols to ports."
|
||||
{"jtds:sqlserver" 1433
|
||||
"mysql" 3306
|
||||
"oracle:oci" 1521
|
||||
"oracle:sid" 1521
|
||||
"oracle:thin" 1521
|
||||
"postgresql" 5432
|
||||
"sqlserver" 1433})
|
||||
|
||||
(def ^:private dbname-separators
|
||||
"Map of schemes to separators. The default is / but a couple are different."
|
||||
{"mssql" ";DATABASENAME="
|
||||
"sqlserver" ";DATABASENAME="
|
||||
"oracle:sid" ":"})
|
||||
|
||||
(defn- ^Properties as-properties
|
||||
"Convert any seq of pairs to a java.utils.Properties instance.
|
||||
Uses as-sql-name to convert both keys and values into strings."
|
||||
[m]
|
||||
(let [p (Properties.)]
|
||||
(doseq [[k v] m]
|
||||
(.setProperty p (name k) (str v)))
|
||||
p))
|
||||
|
||||
(defn- get-driver-connection
|
||||
"Common logic for loading the DriverManager and the designed JDBC driver
|
||||
class and obtaining the appropriate Connection object."
|
||||
[url etc]
|
||||
;; force DriverManager to be loaded
|
||||
(DriverManager/getLoginTimeout)
|
||||
(DriverManager/getConnection url (as-properties etc)))
|
||||
|
||||
(defn- spec->url+etc
|
||||
""
|
||||
[{:keys [dbtype dbname host port classname] :as db-spec}]
|
||||
(let [;; allow aliases for dbtype
|
||||
subprotocol (aliases dbtype dbtype)
|
||||
host (or host "127.0.0.1")
|
||||
port (or port (ports subprotocol))
|
||||
db-sep (dbname-separators dbtype "/")
|
||||
url (cond (= "h2:mem" dbtype)
|
||||
(str "jdbc:" subprotocol ":" dbname ";DB_CLOSE_DELAY=-1")
|
||||
(#{"derby" "h2" "hsqldb" "sqlite"} subprotocol)
|
||||
(str "jdbc:" subprotocol ":" dbname)
|
||||
:else
|
||||
(str "jdbc:" subprotocol ":"
|
||||
(host-prefixes subprotocol "//")
|
||||
host
|
||||
(when port (str ":" port))
|
||||
db-sep dbname))
|
||||
etc (dissoc db-spec :dbtype :dbname)]
|
||||
;; verify the datasource is loadable
|
||||
(if-let [class-name (or classname (classnames subprotocol))]
|
||||
(do
|
||||
(if (string? class-name)
|
||||
(clojure.lang.RT/loadClassForName class-name)
|
||||
(loop [[clazz & more] class-name]
|
||||
(when-let [load-failure
|
||||
(try
|
||||
(clojure.lang.RT/loadClassForName clazz)
|
||||
nil
|
||||
(catch Exception e
|
||||
e))]
|
||||
(if (seq more)
|
||||
(recur more)
|
||||
(throw load-failure))))))
|
||||
(throw (ex-info (str "Unknown dbtype: " dbtype) db-spec)))
|
||||
[url etc]))
|
||||
|
||||
(defn- string->url+etc
|
||||
""
|
||||
[s]
|
||||
[s {}])
|
||||
|
||||
(defn- url+etc->datasource
|
||||
""
|
||||
[[url etc]]
|
||||
(reify DataSource
|
||||
(getConnection [_]
|
||||
(get-driver-connection url etc))
|
||||
(getConnection [_ username password]
|
||||
(get-driver-connection url
|
||||
(assoc etc
|
||||
:username username
|
||||
:password password)))))
|
||||
|
||||
(defn- make-connection
|
||||
"Given a DataSource and a map of options, get a connection and update it
|
||||
as specified by the options."
|
||||
^Connection
|
||||
[^DataSource datasource opts]
|
||||
(let [^Connection connection (.getConnection datasource)]
|
||||
(when (contains? opts :auto-commit?)
|
||||
(.setAutoCommit connection (boolean (:auto-commit? opts))))
|
||||
(when (contains? opts :read-only?)
|
||||
(.setReadOnly connection (boolean (:read-only? opts))))
|
||||
connection))
|
||||
|
||||
(extend-protocol p/Sourceable
|
||||
clojure.lang.Associative
|
||||
(get-datasource [this]
|
||||
(url+etc->datasource (spec->url+etc this)))
|
||||
javax.sql.DataSource
|
||||
(get-datasource [this] this)
|
||||
String
|
||||
(get-datasource [this]
|
||||
(url+etc->datasource (string->url+etc this))))
|
||||
|
||||
(extend-protocol p/Connectable
|
||||
javax.sql.DataSource
|
||||
(get-connection [this opts] (make-connection this opts))
|
||||
Object
|
||||
(get-connection [this opts] (p/get-connection (p/get-datasource this) opts)))
|
||||
121
src/next/jdbc/prepare.clj
Normal file
121
src/next/jdbc/prepare.clj
Normal file
|
|
@ -0,0 +1,121 @@
|
|||
;; copyright (c) 2018-2019 Sean Corfield, all rights reserved
|
||||
|
||||
(ns next.jdbc.prepare
|
||||
""
|
||||
(:require [next.jdbc.protocols :as p])
|
||||
(:import (java.sql Connection
|
||||
PreparedStatement
|
||||
ResultSet
|
||||
Statement)))
|
||||
|
||||
(set! *warn-on-reflection* true)
|
||||
|
||||
(defn set-parameters
|
||||
""
|
||||
^java.sql.PreparedStatement
|
||||
[^PreparedStatement ps params]
|
||||
(when (seq params)
|
||||
(loop [[p & more] params i 1]
|
||||
(.setObject ps i p)
|
||||
(when more
|
||||
(recur more (inc i)))))
|
||||
ps)
|
||||
|
||||
(def ^{:private true
|
||||
:doc "Map friendly :concurrency values to ResultSet constants."}
|
||||
result-set-concurrency
|
||||
{:read-only ResultSet/CONCUR_READ_ONLY
|
||||
:updatable ResultSet/CONCUR_UPDATABLE})
|
||||
|
||||
(def ^{:private true
|
||||
:doc "Map friendly :cursors values to ResultSet constants."}
|
||||
result-set-holdability
|
||||
{:hold ResultSet/HOLD_CURSORS_OVER_COMMIT
|
||||
:close ResultSet/CLOSE_CURSORS_AT_COMMIT})
|
||||
|
||||
(def ^{:private true
|
||||
:doc "Map friendly :type values to ResultSet constants."}
|
||||
result-set-type
|
||||
{:forward-only ResultSet/TYPE_FORWARD_ONLY
|
||||
:scroll-insensitive ResultSet/TYPE_SCROLL_INSENSITIVE
|
||||
:scroll-sensitive ResultSet/TYPE_SCROLL_SENSITIVE})
|
||||
|
||||
(defn- ^{:tag (class (into-array String []))} string-array
|
||||
[return-keys]
|
||||
(into-array String return-keys))
|
||||
|
||||
(defn pre-prepare*
|
||||
"Given a some options, return a statement factory -- a function that will
|
||||
accept a connection and a SQL string and parameters, and return a
|
||||
PreparedStatement representing that."
|
||||
[{:keys [return-keys result-type concurrency cursors
|
||||
fetch-size max-rows timeout]}]
|
||||
(cond->
|
||||
(cond
|
||||
return-keys
|
||||
(do
|
||||
(when (or result-type concurrency cursors)
|
||||
(throw (IllegalArgumentException.
|
||||
(str ":concurrency, :cursors, and :result-type "
|
||||
"may not be specified with :return-keys."))))
|
||||
(if (vector? return-keys)
|
||||
(let [key-names (string-array return-keys)]
|
||||
(fn [^Connection con ^String sql]
|
||||
(try
|
||||
(try
|
||||
(.prepareStatement con sql key-names)
|
||||
(catch Exception _
|
||||
;; assume it is unsupported and try regular generated keys:
|
||||
(.prepareStatement con sql Statement/RETURN_GENERATED_KEYS)))
|
||||
(catch Exception _
|
||||
;; assume it is unsupported and try basic PreparedStatement:
|
||||
(.prepareStatement con sql)))))
|
||||
(fn [^Connection con ^String sql]
|
||||
(try
|
||||
(.prepareStatement con sql Statement/RETURN_GENERATED_KEYS)
|
||||
(catch Exception _
|
||||
;; assume it is unsupported and try basic PreparedStatement:
|
||||
(.prepareStatement con sql))))))
|
||||
|
||||
(and result-type concurrency)
|
||||
(if cursors
|
||||
(fn [^Connection con ^String sql]
|
||||
(.prepareStatement con sql
|
||||
(get result-set-type result-type result-type)
|
||||
(get result-set-concurrency concurrency concurrency)
|
||||
(get result-set-holdability cursors cursors)))
|
||||
(fn [^Connection con ^String sql]
|
||||
(.prepareStatement con sql
|
||||
(get result-set-type result-type result-type)
|
||||
(get result-set-concurrency concurrency concurrency))))
|
||||
|
||||
(or result-type concurrency cursors)
|
||||
(throw (IllegalArgumentException.
|
||||
(str ":concurrency, :cursors, and :result-type "
|
||||
"may not be specified independently.")))
|
||||
:else
|
||||
(fn [^Connection con ^String sql]
|
||||
(.prepareStatement con sql)))
|
||||
fetch-size (as-> f
|
||||
(fn [^Connection con ^String sql]
|
||||
(.setFetchSize ^PreparedStatement (f con sql) fetch-size)))
|
||||
max-rows (as-> f
|
||||
(fn [^Connection con ^String sql]
|
||||
(.setMaxRows ^PreparedStatement (f con sql) max-rows)))
|
||||
timeout (as-> f
|
||||
(fn [^Connection con ^String sql]
|
||||
(.setQueryTimeout ^PreparedStatement (f con sql) timeout)))))
|
||||
|
||||
(defn prepare-fn*
|
||||
"Given a connection, a SQL statement, its parameters, and a statement factory,
|
||||
return a PreparedStatement representing that."
|
||||
^java.sql.PreparedStatement
|
||||
[con sql params factory]
|
||||
(set-parameters (factory con sql) params))
|
||||
|
||||
(extend-protocol p/Preparable
|
||||
java.sql.Connection
|
||||
(prepare [this sql-params opts]
|
||||
(let [[sql & params] sql-params
|
||||
factory (pre-prepare* opts)]
|
||||
(set-parameters (factory this sql) params))))
|
||||
17
src/next/jdbc/protocols.clj
Normal file
17
src/next/jdbc/protocols.clj
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
;; copyright (c) 2018-2019 Sean Corfield, all rights reserved
|
||||
|
||||
(ns next.jdbc.protocols
|
||||
"")
|
||||
|
||||
(set! *warn-on-reflection* true)
|
||||
|
||||
(defprotocol Sourceable
|
||||
(get-datasource ^javax.sql.DataSource [this]))
|
||||
(defprotocol Connectable
|
||||
(get-connection ^java.lang.AutoCloseable [this opts]))
|
||||
(defprotocol Executable
|
||||
(-execute ^clojure.lang.IReduceInit [this sql-params opts]))
|
||||
(defprotocol Preparable
|
||||
(prepare ^java.sql.PreparedStatement [this sql-params opts]))
|
||||
(defprotocol Transactable
|
||||
(-transact [this body-fn opts]))
|
||||
189
src/next/jdbc/result_set.clj
Normal file
189
src/next/jdbc/result_set.clj
Normal file
|
|
@ -0,0 +1,189 @@
|
|||
;; copyright (c) 2018-2019 Sean Corfield, all rights reserved
|
||||
|
||||
(ns next.jdbc.result-set
|
||||
""
|
||||
(:require [clojure.core.protocols :as core-p]
|
||||
[next.jdbc.prepare :as prepare]
|
||||
[next.jdbc.protocols :as p])
|
||||
(:import (java.sql PreparedStatement
|
||||
ResultSet ResultSetMetaData
|
||||
SQLException)))
|
||||
|
||||
(set! *warn-on-reflection* true)
|
||||
|
||||
(defn- get-column-names
|
||||
""
|
||||
[^ResultSet rs opts]
|
||||
(let [^ResultSetMetaData rsmeta (.getMetaData rs)
|
||||
idxs (range 1 (inc (.getColumnCount rsmeta)))]
|
||||
(if-let [ident-fn (:identifiers opts)]
|
||||
(mapv (fn [^Integer i]
|
||||
(keyword (when-let [qualifier (not-empty (.getTableName rsmeta i))]
|
||||
(ident-fn qualifier))
|
||||
(ident-fn (.getColumnLabel rsmeta i))))
|
||||
idxs)
|
||||
(mapv (fn [^Integer i]
|
||||
(keyword (not-empty (.getTableName rsmeta i))
|
||||
(.getColumnLabel rsmeta i)))
|
||||
idxs))))
|
||||
|
||||
(defn- mapify-result-set
|
||||
"Given a result set, return an object that wraps the current row as a hash
|
||||
map. Note that a result set is mutable and the current row will change behind
|
||||
this wrapper so operations need to be eager (and fairly limited).
|
||||
|
||||
Supports ILookup (keywords are treated as strings).
|
||||
|
||||
Supports Associative (again, keywords are treated as strings). If you assoc,
|
||||
a full row will be realized (via seq/into).
|
||||
|
||||
Supports Seqable which realizes a full row of the data."
|
||||
[^ResultSet rs opts]
|
||||
(let [cols (delay (get-column-names rs opts))]
|
||||
(reify
|
||||
|
||||
clojure.lang.ILookup
|
||||
(valAt [this k]
|
||||
(try
|
||||
(.getObject rs (name k))
|
||||
(catch SQLException _)))
|
||||
(valAt [this k not-found]
|
||||
(try
|
||||
(.getObject rs (name k))
|
||||
(catch SQLException _
|
||||
not-found)))
|
||||
|
||||
clojure.lang.Associative
|
||||
(containsKey [this k]
|
||||
(try
|
||||
(.getObject rs (name k))
|
||||
true
|
||||
(catch SQLException _
|
||||
false)))
|
||||
(entryAt [this k]
|
||||
(try
|
||||
(clojure.lang.MapEntry. k (.getObject rs (name k)))
|
||||
(catch SQLException _)))
|
||||
(assoc [this k v]
|
||||
(assoc (into {} (seq this)) k v))
|
||||
|
||||
clojure.lang.Seqable
|
||||
(seq [this]
|
||||
(seq (mapv (fn [^Integer i]
|
||||
(clojure.lang.MapEntry. (nth @cols (dec i))
|
||||
(.getObject rs i)))
|
||||
(range 1 (inc (count @cols)))))))))
|
||||
|
||||
(defn- reduce-stmt
|
||||
""
|
||||
[^PreparedStatement stmt f init opts]
|
||||
(if-let [^ResultSet rs (if (.execute stmt)
|
||||
(.getResultSet stmt)
|
||||
(when (:return-keys opts)
|
||||
(try
|
||||
(.getGeneratedKeys stmt)
|
||||
(catch Exception _))))]
|
||||
(let [rs-map (mapify-result-set rs opts)]
|
||||
(loop [init' init]
|
||||
(if (.next rs)
|
||||
(let [result (f init' rs-map)]
|
||||
(if (reduced? result)
|
||||
@result
|
||||
(recur result)))
|
||||
init')))
|
||||
(f init {::update-count (.getUpdateCount stmt)})))
|
||||
|
||||
(extend-protocol p/Executable
|
||||
java.sql.Connection
|
||||
(-execute [this sql-params opts]
|
||||
(let [factory (prepare/pre-prepare* opts)]
|
||||
(reify clojure.lang.IReduceInit
|
||||
(reduce [_ f init]
|
||||
(with-open [stmt (prepare/prepare-fn* this
|
||||
(first sql-params)
|
||||
(rest sql-params)
|
||||
factory)]
|
||||
(reduce-stmt stmt f init opts))))))
|
||||
javax.sql.DataSource
|
||||
(-execute [this sql-params opts]
|
||||
(let [factory (prepare/pre-prepare* opts)]
|
||||
(reify clojure.lang.IReduceInit
|
||||
(reduce [_ f init]
|
||||
(with-open [con (p/get-connection this opts)]
|
||||
(with-open [stmt (prepare/prepare-fn* con
|
||||
(first sql-params)
|
||||
(rest sql-params)
|
||||
factory)]
|
||||
(reduce-stmt stmt f init opts)))))))
|
||||
java.sql.PreparedStatement
|
||||
(-execute [this _ opts]
|
||||
(reify clojure.lang.IReduceInit
|
||||
;; we can't tell if this PreparedStatement will return generated
|
||||
;; keys so we pass a truthy value to at least attempt it if we
|
||||
;; do not get a ResultSet back from the execute call
|
||||
(reduce [_ f init]
|
||||
(reduce-stmt this f init (assoc opts :return-keys true)))))
|
||||
Object
|
||||
(-execute [this sql-params opts]
|
||||
(p/-execute (p/get-datasource this) sql-params opts)))
|
||||
|
||||
(declare navize-row)
|
||||
|
||||
(defn- row-into-map
|
||||
[connectable opts]
|
||||
(fn [row]
|
||||
(into (with-meta {} {`core-p/datafy (navize-row connectable opts)}) row)))
|
||||
|
||||
(defn execute!
|
||||
""
|
||||
[connectable sql-params opts]
|
||||
(into []
|
||||
(map (or (:row-fn opts) (row-into-map connectable opts)))
|
||||
(p/-execute connectable sql-params opts)))
|
||||
|
||||
(defn execute-one!
|
||||
""
|
||||
[connectable sql-params opts]
|
||||
(let [row-fn (or (:row-fn opts) (row-into-map connectable opts))]
|
||||
(reduce (fn [_ row]
|
||||
(reduced (row-fn row)))
|
||||
nil
|
||||
(p/-execute connectable sql-params opts))))
|
||||
|
||||
(defn- default-schema
|
||||
"The default schema lookup rule for column names.
|
||||
|
||||
If a column name ends with _id or id, it is assumed to be a foreign key
|
||||
into the table identified by the first part of the column name."
|
||||
[col]
|
||||
(let [[_ table] (re-find #"(?i)^(.+)_?id$" (name col))]
|
||||
(when table
|
||||
[(keyword table) :id])))
|
||||
|
||||
(defn- navize-row
|
||||
""
|
||||
[connectable opts]
|
||||
(fn [row]
|
||||
(with-meta row
|
||||
{`core-p/nav (fn [coll k v]
|
||||
(let [[table fk cardinality] (or (get-in opts [:schema k])
|
||||
(default-schema k))]
|
||||
(if fk
|
||||
(try
|
||||
(let [entity-fn (:entities opts identity)
|
||||
exec-fn! (if (= :many cardinality)
|
||||
execute!
|
||||
execute-one!)]
|
||||
(exec-fn! connectable
|
||||
[(str "SELECT * FROM "
|
||||
(entity-fn (name table))
|
||||
" WHERE "
|
||||
(entity-fn (name fk))
|
||||
" = ?")
|
||||
v]
|
||||
opts))
|
||||
(catch Exception _
|
||||
;; assume an exception means we just cannot
|
||||
;; navigate anywhere, so return just the value
|
||||
v))
|
||||
v)))})))
|
||||
76
src/next/jdbc/transaction.clj
Normal file
76
src/next/jdbc/transaction.clj
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
;; copyright (c) 2018-2019 Sean Corfield, all rights reserved
|
||||
|
||||
(ns next.jdbc.transaction
|
||||
""
|
||||
(:require [next.jdbc.protocols :as p])
|
||||
(:import (java.sql Connection
|
||||
SQLException)))
|
||||
|
||||
(set! *warn-on-reflection* true)
|
||||
|
||||
(def ^:private isolation-levels
|
||||
"Transaction isolation levels."
|
||||
{:none Connection/TRANSACTION_NONE
|
||||
:read-committed Connection/TRANSACTION_READ_COMMITTED
|
||||
:read-uncommitted Connection/TRANSACTION_READ_UNCOMMITTED
|
||||
:repeatable-read Connection/TRANSACTION_REPEATABLE_READ
|
||||
:serializable Connection/TRANSACTION_SERIALIZABLE})
|
||||
|
||||
(defn- transact*
|
||||
""
|
||||
[^Connection con f opts]
|
||||
(let [{:keys [isolation read-only? rollback-only?]} opts
|
||||
old-autocommit (.getAutoCommit con)
|
||||
old-isolation (.getTransactionIsolation con)
|
||||
old-readonly (.isReadOnly con)]
|
||||
(io!
|
||||
(when isolation
|
||||
(.setTransactionIsolation con (isolation isolation-levels)))
|
||||
(when read-only?
|
||||
(.setReadOnly con true))
|
||||
(.setAutoCommit con false)
|
||||
(try
|
||||
(let [result (f con)]
|
||||
(if rollback-only?
|
||||
(.rollback con)
|
||||
(.commit con))
|
||||
result)
|
||||
(catch Throwable t
|
||||
(try
|
||||
(.rollback con)
|
||||
(catch Throwable rb
|
||||
;; combine both exceptions
|
||||
(throw (ex-info (str "Rollback failed handling \""
|
||||
(.getMessage t)
|
||||
"\"")
|
||||
{:rollback rb
|
||||
:handling t}))))
|
||||
(throw t))
|
||||
(finally ; tear down
|
||||
;; the following can throw SQLExceptions but we do not
|
||||
;; want those to replace any exception currently being
|
||||
;; handled -- and if the connection got closed, we just
|
||||
;; want to ignore exceptions here anyway
|
||||
(try
|
||||
(.setAutoCommit con old-autocommit)
|
||||
(catch Exception _))
|
||||
(when isolation
|
||||
(try
|
||||
(.setTransactionIsolation con old-isolation)
|
||||
(catch Exception _)))
|
||||
(when read-only?
|
||||
(try
|
||||
(.setReadOnly con old-readonly)
|
||||
(catch Exception _))))))))
|
||||
|
||||
(extend-protocol p/Transactable
|
||||
java.sql.Connection
|
||||
(-transact [this body-fn opts]
|
||||
(transact* this body-fn opts))
|
||||
javax.sql.DataSource
|
||||
(-transact [this body-fn opts]
|
||||
(with-open [con (p/get-connection this opts)]
|
||||
(transact* con body-fn opts)))
|
||||
Object
|
||||
(-transact [this body-fn opts]
|
||||
(p/-transact (p/get-datasource this) body-fn opts)))
|
||||
Loading…
Reference in a new issue