Work in progress
Basic machinery is working: spec -> connection -> prepared statement -> execute -> reduce. Everything is still very much in flux at this point.
This commit is contained in:
parent
8a44a8cbca
commit
e7d0065ced
1 changed files with 120 additions and 75 deletions
|
|
@ -2,27 +2,55 @@
|
||||||
|
|
||||||
(ns next.jdbc
|
(ns next.jdbc
|
||||||
""
|
""
|
||||||
(:require [clojure.set :as set])
|
(:require [clojure.set :as set]
|
||||||
(:import (java.sql Connection DriverManager ResultSet ResultSetMetaData
|
[clojure.string :as str])
|
||||||
SQLException)
|
(:import (java.lang AutoCloseable)
|
||||||
|
(java.sql Connection DriverManager
|
||||||
|
PreparedStatement
|
||||||
|
ResultSet ResultSetMetaData
|
||||||
|
SQLException Statement)
|
||||||
(javax.sql DataSource)
|
(javax.sql DataSource)
|
||||||
(java.util Properties)))
|
(java.util Properties)))
|
||||||
|
|
||||||
|
(comment
|
||||||
|
"Key areas of interaction:
|
||||||
|
1a. Making a DataSource -- turn everything connectable into a DataSource
|
||||||
|
1b. Given a DataSource, we can getConnection()
|
||||||
|
2. Preparing a Statement -- connection + SQL + params (+ options)
|
||||||
|
(multiple param groups means addBatch() calls)
|
||||||
|
3. Execute a (Prepared) Statement to produce a ResultSet (or update count)
|
||||||
|
(can execute batch of prepared statements and get multiple results)"
|
||||||
|
|
||||||
|
"Additional areas:
|
||||||
|
1. with-db-connection -- given 'something', get a connection, execute the
|
||||||
|
body, and close the connection (if we opened it).
|
||||||
|
2. with-db-transaction -- given 'something', get a connection, start a
|
||||||
|
transaction, execute the body, commit/rollback, and close the connection
|
||||||
|
(if we opened it else restore connection state)."
|
||||||
|
"Database metadata can tell us:
|
||||||
|
0. If get generated keys is supported!
|
||||||
|
1. If batch updates are supported
|
||||||
|
2. If save points are supported
|
||||||
|
3. If various concurrency/holdability/etc options are supported")
|
||||||
|
|
||||||
(set! *warn-on-reflection* true)
|
(set! *warn-on-reflection* true)
|
||||||
|
|
||||||
(defprotocol Closeable
|
|
||||||
(close [this]))
|
|
||||||
(defprotocol Connectable
|
(defprotocol Connectable
|
||||||
(get-connection [this opts]))
|
(get-connection ^AutoCloseable [this]))
|
||||||
(defprotocol Preparable
|
(defprotocol Preparable
|
||||||
(->jdbc-connection [this])
|
(->jdbc-connection ^Connection [this])
|
||||||
(get-statement [this db-spec sql-params opts]))
|
(prepare ^PreparedStatement [this sql-params opts]))
|
||||||
(defprotocol Transactable
|
(defprotocol Transactable
|
||||||
(transact [this f opts]))
|
(transact [this f opts]))
|
||||||
|
|
||||||
(defn- get-statement*
|
(defn- prepare*
|
||||||
[db-spec con sql-params opts]
|
"Given a connection, a SQL statement, its parameters, and some options,
|
||||||
(.prepareStatement con (first sql-params)))
|
return a PreparedStatement representing that."
|
||||||
|
[^Connection con [sql & params] opts]
|
||||||
|
(let [^PreparedStatement s (.prepareStatement con sql)]
|
||||||
|
(doseq [p params]
|
||||||
|
(.setObject s 1 p))
|
||||||
|
s))
|
||||||
|
|
||||||
(def ^:private isolation-levels
|
(def ^:private isolation-levels
|
||||||
"Transaction isolation levels."
|
"Transaction isolation levels."
|
||||||
|
|
@ -50,14 +78,13 @@
|
||||||
|
|
||||||
(defn- transact*
|
(defn- transact*
|
||||||
""
|
""
|
||||||
[db-spec con transacted f opts]
|
[con transacted f opts]
|
||||||
(let [{:keys [isolation read-only? rollback-only?]}
|
(let [{:keys [isolation read-only? rollback-only?]} opts
|
||||||
(merge (when (map? db-spec) db-spec) opts)
|
|
||||||
committable? (not rollback-only?)]
|
committable? (not rollback-only?)]
|
||||||
(if transacted
|
(if transacted
|
||||||
;; should check isolation level; maybe implement save points?
|
;; should check isolation level; maybe implement save points?
|
||||||
(f con)
|
(f con)
|
||||||
(with-open [t-con (assoc (get-connection con opts)
|
(with-open [^AutoCloseable t-con (assoc (get-connection con)
|
||||||
:transacted (atom committable?))]
|
:transacted (atom committable?))]
|
||||||
(let [^Connection jdbc (->jdbc-connection t-con)
|
(let [^Connection jdbc (->jdbc-connection t-con)
|
||||||
old-autocommit (.getAutoCommit jdbc)
|
old-autocommit (.getAutoCommit jdbc)
|
||||||
|
|
@ -104,33 +131,27 @@
|
||||||
(.setReadOnly jdbc old-readonly)
|
(.setReadOnly jdbc old-readonly)
|
||||||
(catch Exception _)))))))))))
|
(catch Exception _)))))))))))
|
||||||
|
|
||||||
(defrecord NestedConnection [db-spec con transacted opts]
|
(defrecord NestedConnection [con transacted]
|
||||||
Connectable
|
Connectable
|
||||||
(get-connection [this opts']
|
(get-connection [this] (->NestedConnection con transacted))
|
||||||
(->NestedConnection db-spec con transacted (merge opts opts')))
|
|
||||||
Preparable
|
Preparable
|
||||||
(->jdbc-connection [this] con)
|
(->jdbc-connection [this] con)
|
||||||
(get-statement [this db-spec sql-params opts']
|
(prepare [this sql-params opts] (prepare* con sql-params opts))
|
||||||
(get-statement* db-spec con sql-params (merge opts opts')))
|
AutoCloseable
|
||||||
Closeable
|
|
||||||
(close [this])
|
(close [this])
|
||||||
Transactable
|
Transactable
|
||||||
(transact [this f opts']
|
(transact [this f opts] (transact* con transacted f opts)))
|
||||||
(transact* db-spec con transacted f (merge opts opts'))))
|
|
||||||
|
|
||||||
(defrecord Connected [db-spec con transacted opts]
|
(defrecord Connected [con transacted]
|
||||||
Connectable
|
Connectable
|
||||||
(get-connection [this opts']
|
(get-connection [this] (->NestedConnection con transacted))
|
||||||
(->NestedConnection db-spec con transacted (merge opts opts')))
|
|
||||||
Preparable
|
Preparable
|
||||||
(->jdbc-connection [this] con)
|
(->jdbc-connection [this] con)
|
||||||
(get-statement [this db-spec sql-params opts']
|
(prepare [this sql-params opts] (prepare* con sql-params opts))
|
||||||
(get-statement* db-spec con sql-params (merge opts opts')))
|
AutoCloseable
|
||||||
Closeable
|
|
||||||
(close [this] (.close ^Connection con))
|
(close [this] (.close ^Connection con))
|
||||||
Transactable
|
Transactable
|
||||||
(transact [this f opts']
|
(transact [this f opts] (transact* con transacted f opts)))
|
||||||
(transact* db-spec con transacted f (merge opts opts'))))
|
|
||||||
|
|
||||||
(defmacro in-transaction
|
(defmacro in-transaction
|
||||||
[[sym con opts] & body]
|
[[sym con opts] & body]
|
||||||
|
|
@ -213,7 +234,7 @@
|
||||||
(defn- get-driver-connection
|
(defn- get-driver-connection
|
||||||
"Common logic for loading the DriverManager and the designed JDBC driver
|
"Common logic for loading the DriverManager and the designed JDBC driver
|
||||||
class and obtaining the appropriate Connection object."
|
class and obtaining the appropriate Connection object."
|
||||||
[classname subprotocol db-spec url etc opts error-msg]
|
[classname subprotocol db-spec url etc error-msg]
|
||||||
(if-let [class-name (or classname (classnames subprotocol))]
|
(if-let [class-name (or classname (classnames subprotocol))]
|
||||||
(do
|
(do
|
||||||
;; force DriverManager to be loaded
|
;; force DriverManager to be loaded
|
||||||
|
|
@ -232,11 +253,11 @@
|
||||||
(throw load-failure))))))
|
(throw load-failure))))))
|
||||||
(throw (ex-info error-msg db-spec)))
|
(throw (ex-info error-msg db-spec)))
|
||||||
(-> (DriverManager/getConnection url (as-properties etc))
|
(-> (DriverManager/getConnection url (as-properties etc))
|
||||||
(modify-connection opts)))
|
(modify-connection etc)))
|
||||||
|
|
||||||
(defn- spec->connection
|
(defn- spec->connection
|
||||||
""
|
""
|
||||||
[{:keys [dbtype dbname host port classname] :as db-spec} opts]
|
[{:keys [dbtype dbname host port classname] :as db-spec}]
|
||||||
(let [;; allow aliases for dbtype
|
(let [;; allow aliases for dbtype
|
||||||
subprotocol (aliases dbtype dbtype)
|
subprotocol (aliases dbtype dbtype)
|
||||||
host (or host "127.0.0.1")
|
host (or host "127.0.0.1")
|
||||||
|
|
@ -254,7 +275,7 @@
|
||||||
db-sep dbname))
|
db-sep dbname))
|
||||||
etc (dissoc db-spec :dbtype :dbname)]
|
etc (dissoc db-spec :dbtype :dbname)]
|
||||||
(get-driver-connection classname subprotocol db-spec
|
(get-driver-connection classname subprotocol db-spec
|
||||||
url etc opts
|
url etc
|
||||||
(str "Unknown dbtype: " dbtype))))
|
(str "Unknown dbtype: " dbtype))))
|
||||||
|
|
||||||
(defn- string->spec
|
(defn- string->spec
|
||||||
|
|
@ -262,19 +283,20 @@
|
||||||
[s]
|
[s]
|
||||||
{})
|
{})
|
||||||
|
|
||||||
(extend-protocol Connectable
|
(extend-protocol
|
||||||
|
Connectable
|
||||||
clojure.lang.Associative
|
clojure.lang.Associative
|
||||||
(get-connection [this opts]
|
(get-connection [this]
|
||||||
(->Connected this (spec->connection this opts) nil opts))
|
(->Connected (spec->connection this) nil))
|
||||||
Connection
|
Connection
|
||||||
(get-connection [this opts]
|
(get-connection [this]
|
||||||
(->Connected {} this nil opts))
|
(->Connected this nil))
|
||||||
DataSource
|
DataSource
|
||||||
(get-connection [this opts]
|
(get-connection [this]
|
||||||
(->Connected {} (modify-connection (.getConnection this) opts) nil opts))
|
(->Connected (.getConnection this) nil))
|
||||||
String
|
String
|
||||||
(get-connection [this opts]
|
(get-connection [this]
|
||||||
(get-connection (string->spec this) opts)))
|
(get-connection (string->spec this))))
|
||||||
|
|
||||||
(comment
|
(comment
|
||||||
(get-connection {:dbtype "derby" :dbname "clojure_test" :create true} {})
|
(get-connection {:dbtype "derby" :dbname "clojure_test" :create true} {})
|
||||||
|
|
@ -286,20 +308,27 @@
|
||||||
(let [^ResultSetMetaData rsmeta (.getMetaData rs)
|
(let [^ResultSetMetaData rsmeta (.getMetaData rs)
|
||||||
idxs (range 1 (inc (.getColumnCount rsmeta)))]
|
idxs (range 1 (inc (.getColumnCount rsmeta)))]
|
||||||
(mapv (fn [^Integer i]
|
(mapv (fn [^Integer i]
|
||||||
(keyword (.getTableName rsmeta i) (.getColumnLabel rsmeta i)))
|
(keyword (str/lower-case (.getTableName rsmeta i))
|
||||||
|
(str/lower-case (.getColumnLabel rsmeta i))))
|
||||||
idxs)))
|
idxs)))
|
||||||
|
|
||||||
(defn- mapify-result-set
|
(defn- mapify-result-set
|
||||||
"Given a result set, return an object that wraps the current row as a hash
|
"Given a result set, return an object that wraps the current row as a hash
|
||||||
map. Note that a result set is mutable and the current row will change behind
|
map. Note that a result set is mutable and the current row will change behind
|
||||||
this wrapper so operations need to be eager (and fairly limited).
|
this wrapper so operations need to be eager (and fairly limited).
|
||||||
|
|
||||||
Supports ILookup (keywords are treated as strings).
|
Supports ILookup (keywords are treated as strings).
|
||||||
|
|
||||||
Supports Associative for lookup only (again, keywords are treated as strings).
|
Supports Associative for lookup only (again, keywords are treated as strings).
|
||||||
|
|
||||||
|
Supports Seqable which realizes a full row of the data.
|
||||||
|
|
||||||
Later we may realize a new hash map when assoc (and other, future, operations
|
Later we may realize a new hash map when assoc (and other, future, operations
|
||||||
are performed on the result set row)."
|
are performed on the result set row)."
|
||||||
[^ResultSet rs]
|
[^ResultSet rs opts]
|
||||||
(let [cols (delay (get-column-names rs))]
|
(let [cols (delay (get-column-names rs))]
|
||||||
(reify
|
(reify
|
||||||
|
|
||||||
clojure.lang.ILookup
|
clojure.lang.ILookup
|
||||||
(valAt [this k]
|
(valAt [this k]
|
||||||
(try
|
(try
|
||||||
|
|
@ -310,6 +339,7 @@
|
||||||
(.getObject rs (name k))
|
(.getObject rs (name k))
|
||||||
(catch SQLException _
|
(catch SQLException _
|
||||||
not-found)))
|
not-found)))
|
||||||
|
|
||||||
clojure.lang.Associative
|
clojure.lang.Associative
|
||||||
(containsKey [this k]
|
(containsKey [this k]
|
||||||
(try
|
(try
|
||||||
|
|
@ -323,24 +353,26 @@
|
||||||
(catch SQLException _)))
|
(catch SQLException _)))
|
||||||
(assoc [this _ _]
|
(assoc [this _ _]
|
||||||
(throw (ex-info "assoc not supported on raw result set" {})))
|
(throw (ex-info "assoc not supported on raw result set" {})))
|
||||||
|
|
||||||
clojure.lang.Seqable
|
clojure.lang.Seqable
|
||||||
(seq [this]
|
(seq [this]
|
||||||
(seq (mapv (fn [^Integer i]
|
(seq (mapv (fn [^Integer i]
|
||||||
(clojure.lang.MapEntry. (nth @cols i)
|
(clojure.lang.MapEntry. (nth @cols (dec i))
|
||||||
(.getObject rs (inc i))))
|
(.getObject rs i)))
|
||||||
(range (count @cols))))))))
|
(range 1 (inc (count @cols)))))))))
|
||||||
|
|
||||||
(defn execute!
|
(defn execute!
|
||||||
"General SQL execution function. Returns a reducible that, when reduced,
|
"General SQL execution function. Returns a reducible that, when reduced,
|
||||||
runs the SQL and yields the result."
|
runs the SQL and yields the result."
|
||||||
[db-spec sql-params opts]
|
[db-spec sql-params & [opts]]
|
||||||
|
(let [opts (merge (when (map? db-spec) db-spec) opts)]
|
||||||
(reify clojure.lang.IReduceInit
|
(reify clojure.lang.IReduceInit
|
||||||
(reduce [this f init]
|
(reduce [this f init]
|
||||||
(with-open [con (get-connection db-spec opts)]
|
(with-open [con (get-connection db-spec)]
|
||||||
(with-open [stmt (get-statement con db-spec sql-params opts)]
|
(with-open [stmt (prepare con sql-params opts)]
|
||||||
(if (.execute stmt)
|
(if (.execute stmt)
|
||||||
(let [rs (.getResultSet stmt)
|
(let [rs (.getResultSet stmt)
|
||||||
rs-map (mapify-result-set rs)]
|
rs-map (mapify-result-set rs opts)]
|
||||||
(loop [init' init]
|
(loop [init' init]
|
||||||
(if (.next rs)
|
(if (.next rs)
|
||||||
(let [result (f init' rs-map)]
|
(let [result (f init' rs-map)]
|
||||||
|
|
@ -348,10 +380,23 @@
|
||||||
@result
|
@result
|
||||||
(recur result)))
|
(recur result)))
|
||||||
init')))
|
init')))
|
||||||
(.getUpdateCount stmt)))))))
|
(.getUpdateCount stmt))))))))
|
||||||
|
|
||||||
(comment
|
(comment
|
||||||
(def db-spec {:dbtype "mysql" :dbname "worldsingles" :user "root" :password "visual"})
|
(def db-spec {:dbtype "mysql" :dbname "worldsingles" :user "root" :password "visual" :useSSL false})
|
||||||
(reduce (fn [rs m] (conj rs (into {} m)))
|
(def db-spec {:dbtype "h2:mem" :dbname "perf"})
|
||||||
[]
|
(def con (get-connection db-spec))
|
||||||
(execute! db-spec ["select * from status"] {})))
|
(reduce + 0 (execute! con ["DROP TABLE fruit"]))
|
||||||
|
(reduce + 0 (execute! con ["CREATE TABLE fruit (id int default 0, name varchar(32) primary key, appearance varchar(32), cost int, grade real)"]))
|
||||||
|
(reduce + 0 (execute! con ["INSERT INTO fruit (id,name,appearance,cost,grade) VALUES (1,'Apple','red',59,87), (2,'Banana','yellow',29,92.2), (3,'Peach','fuzzy',139,90.0), (4,'Orange','juicy',89,88.6)"]))
|
||||||
|
(close con)
|
||||||
|
(require '[criterium.core :refer [bench quick-bench]])
|
||||||
|
(quick-bench (reduce + (take 10e6 (range))))
|
||||||
|
(quick-bench
|
||||||
|
(reduce (fn [_ row] (reduced (:name row)))
|
||||||
|
nil
|
||||||
|
(execute! con ["select * from fruit where appearance = ?" "red"])))
|
||||||
|
(quick-bench
|
||||||
|
(reduce (fn [rs m] (reduced (into {} m)))
|
||||||
|
nil
|
||||||
|
(execute! con ["select * from fruit where appearance = ?" "red"]))))
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue