Merge pull request #6 from eva-healthtech/implicit-transaction
Add a helper method for implicit transactions
This commit is contained in:
commit
97ea4ea2b5
3 changed files with 76 additions and 35 deletions
|
|
@ -145,6 +145,20 @@
|
|||
(execute [_] (body)))
|
||||
(->TransactionOptions opts))))
|
||||
|
||||
|
||||
(def ^:dynamic *session* nil)
|
||||
|
||||
(defn with-implicit-transaction
|
||||
"Automatically sets the session / transaction for all mongo operations
|
||||
within the scope, using a dynamic binding"
|
||||
[{:keys [^MongoClient client transaction-opts session-opts] :or {transaction-opts {} session-opts {}}} body]
|
||||
(with-open [^ClientSession session (start-session client session-opts)]
|
||||
(binding [*session* session]
|
||||
(with-transaction
|
||||
*session*
|
||||
body
|
||||
transaction-opts))))
|
||||
|
||||
;;; Utility
|
||||
|
||||
(defn connect-to-db
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
(ns mongo-driver-3.collection
|
||||
(:refer-clojure :exclude [find empty? drop])
|
||||
(:require [mongo-driver-3.model :refer :all])
|
||||
(:require [mongo-driver-3.model :refer :all]
|
||||
[mongo-driver-3.client :refer [*session*]])
|
||||
(:import (com.mongodb MongoNamespace)
|
||||
(com.mongodb.client MongoDatabase MongoCollection ClientSession)
|
||||
(com.mongodb.client.model IndexModel)
|
||||
|
|
@ -67,12 +68,13 @@
|
|||
(aggregate db coll pipeline {}))
|
||||
([^MongoDatabase db coll pipeline opts]
|
||||
(let [{:keys [^ClientSession session allow-disk-use? ^Integer batch-size bypass-document-validation? keywordize? raw?] :or {keywordize? true raw? false}} opts
|
||||
it (cond-> (if session
|
||||
(.aggregate (collection db coll opts) session ^List (map document pipeline))
|
||||
(.aggregate (collection db coll opts) ^List (map document pipeline)))
|
||||
(some? allow-disk-use?) (.allowDiskUse allow-disk-use?)
|
||||
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)
|
||||
batch-size (.batchSize batch-size))]
|
||||
^ClientSession session (or session *session*)
|
||||
it (cond-> (if session
|
||||
(.aggregate (collection db coll opts) session ^List (map document pipeline))
|
||||
(.aggregate (collection db coll opts) ^List (map document pipeline)))
|
||||
(some? allow-disk-use?) (.allowDiskUse allow-disk-use?)
|
||||
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)
|
||||
batch-size (.batchSize batch-size))]
|
||||
|
||||
(if-not raw?
|
||||
(map (fn [x] (from-document x keywordize?)) (seq it))
|
||||
|
|
@ -102,7 +104,7 @@
|
|||
(bulk-write db coll operations {}))
|
||||
([^MongoDatabase db coll operations opts]
|
||||
(let [opts' (->BulkWriteOptions opts)]
|
||||
(if-let [session (:session opts)]
|
||||
(if-let [session (or (:session opts) *session*)]
|
||||
(.bulkWrite (collection db coll opts') ^ClientSession session ^List (map write-model operations))
|
||||
(.bulkWrite (collection db coll opts') (map write-model operations))))))
|
||||
|
||||
|
|
@ -130,7 +132,7 @@
|
|||
(count-documents db coll q {}))
|
||||
([^MongoDatabase db coll q opts]
|
||||
(let [opts' (->CountOptions opts)]
|
||||
(if-let [session (:session opts)]
|
||||
(if-let [session (or (:session opts) *session*)]
|
||||
(.countDocuments (collection db coll opts) session (document q) opts')
|
||||
(.countDocuments (collection db coll opts) (document q) opts')))))
|
||||
|
||||
|
|
@ -150,7 +152,7 @@
|
|||
([^MongoDatabase db coll q]
|
||||
(delete-one db coll q {}))
|
||||
([^MongoDatabase db coll q opts]
|
||||
(if-let [session (:session opts)]
|
||||
(if-let [session (or (:session opts) *session*)]
|
||||
(.deleteOne (collection db coll opts) session (document q) (->DeleteOptions opts))
|
||||
(.deleteOne (collection db coll opts) (document q) (->DeleteOptions opts)))))
|
||||
|
||||
|
|
@ -170,7 +172,7 @@
|
|||
([^MongoDatabase db coll q]
|
||||
(delete-many db coll q {}))
|
||||
([^MongoDatabase db coll q opts]
|
||||
(if-let [session (:session opts)]
|
||||
(if-let [session (or (:session opts) *session*)]
|
||||
(.deleteMany (collection db coll opts) session (document q) (->DeleteOptions opts))
|
||||
(.deleteMany (collection db coll opts) (document q) (->DeleteOptions opts)))))
|
||||
|
||||
|
|
@ -196,13 +198,14 @@
|
|||
(find db coll q {}))
|
||||
([^MongoDatabase db coll q opts]
|
||||
(let [{:keys [limit skip sort projection ^ClientSession session keywordize? raw?] :or {keywordize? true raw? false}} opts]
|
||||
(let [it (cond-> (if session
|
||||
(.find (collection db coll opts) session (document q))
|
||||
(.find (collection db coll opts) (document q)))
|
||||
limit (.limit limit)
|
||||
skip (.skip skip)
|
||||
sort (.sort (document sort))
|
||||
projection (.projection (document projection)))]
|
||||
(let [^ClientSession session (or session *session*)
|
||||
it (cond-> (if session
|
||||
(.find (collection db coll opts) session (document q))
|
||||
(.find (collection db coll opts) (document q)))
|
||||
limit (.limit limit)
|
||||
skip (.skip skip)
|
||||
sort (.sort (document sort))
|
||||
projection (.projection (document projection)))]
|
||||
|
||||
(if-not raw?
|
||||
(map (fn [x] (from-document x keywordize?)) (seq it))
|
||||
|
|
@ -241,7 +244,8 @@
|
|||
(find-one-and-update db coll q update {}))
|
||||
([^MongoDatabase db coll q update opts]
|
||||
(let [{:keys [keywordize? ^ClientSession session] :or {keywordize? true}} opts
|
||||
opts' (->FindOneAndUpdateOptions opts)]
|
||||
^ClientSession session (or session *session*)
|
||||
opts' (->FindOneAndUpdateOptions opts)]
|
||||
(-> (if session
|
||||
(.findOneAndUpdate (collection db coll opts) session (document q) (document update) opts')
|
||||
(.findOneAndUpdate (collection db coll opts) (document q) (document update) opts'))
|
||||
|
|
@ -271,7 +275,8 @@
|
|||
(find-one-and-replace db coll q doc {}))
|
||||
([^MongoDatabase db coll q doc opts]
|
||||
(let [{:keys [keywordize? session] :or {keywordize? true}} opts
|
||||
opts' (->FindOneAndReplaceOptions opts)]
|
||||
session (or session *session*)
|
||||
opts' (->FindOneAndReplaceOptions opts)]
|
||||
(-> (if session
|
||||
(.findOneAndReplace (collection db coll opts) session (document q) (document doc) opts')
|
||||
(.findOneAndReplace (collection db coll opts) (document q) (document doc) opts'))
|
||||
|
|
@ -297,7 +302,7 @@
|
|||
(insert-one db coll doc {}))
|
||||
([^MongoDatabase db coll doc opts]
|
||||
(let [opts' (->InsertOneOptions opts)]
|
||||
(if-let [session (:session opts)]
|
||||
(if-let [session (or (:session opts) *session*)]
|
||||
(.insertOne (collection db coll opts) session (document doc) opts')
|
||||
(.insertOne (collection db coll opts) (document doc) opts')))))
|
||||
|
||||
|
|
@ -322,7 +327,7 @@
|
|||
(insert-many db coll docs {}))
|
||||
([^MongoDatabase db coll docs opts]
|
||||
(let [opts' (->InsertManyOptions opts)]
|
||||
(if-let [^ClientSession session (:session opts)]
|
||||
(if-let [^ClientSession session (or (:session opts) *session*)]
|
||||
(.insertMany (collection db coll opts) session ^List (map document docs) opts')
|
||||
(.insertMany (collection db coll opts) ^List (map document docs) opts')))))
|
||||
|
||||
|
|
@ -346,7 +351,7 @@
|
|||
([^MongoDatabase db coll q doc]
|
||||
(find-one-and-replace db coll q doc {}))
|
||||
([^MongoDatabase db coll q doc opts]
|
||||
(if-let [^ClientSession session (:session opts)]
|
||||
(if-let [^ClientSession session (or (:session opts) *session*)]
|
||||
(.replaceOne (collection db coll opts) session (document q) (document doc) (->ReplaceOptions opts))
|
||||
(.replaceOne (collection db coll opts) (document q) (document doc) (->ReplaceOptions opts)))))
|
||||
|
||||
|
|
@ -370,7 +375,7 @@
|
|||
([^MongoDatabase db coll q update]
|
||||
(update-one db coll q update {}))
|
||||
([^MongoDatabase db coll q update opts]
|
||||
(if-let [^ClientSession session (:session opts)]
|
||||
(if-let [^ClientSession session (or (:session opts) *session*)]
|
||||
(.updateOne (collection db coll opts) session (document q) (document update) (->UpdateOptions opts))
|
||||
(.updateOne (collection db coll opts) (document q) (document update) (->UpdateOptions opts)))))
|
||||
|
||||
|
|
@ -394,7 +399,7 @@
|
|||
([^MongoDatabase db coll q update]
|
||||
(update-many db coll q update {}))
|
||||
([^MongoDatabase db coll q update opts]
|
||||
(if-let [^ClientSession session (:session opts)]
|
||||
(if-let [^ClientSession session (or (:session opts) *session*)]
|
||||
(.updateMany (collection db coll opts) session (document q) (document update) (->UpdateOptions opts))
|
||||
(.updateMany (collection db coll opts) (document q) (document update) (->UpdateOptions opts)))))
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,9 @@
|
|||
(ns mongo-driver-3.client-test
|
||||
(:require [clojure.test :refer :all]
|
||||
[mongo-driver-3.client :as mg]
|
||||
[mongo-driver-3.collection :as mc])
|
||||
(:import (com.mongodb.client MongoClient MongoDatabase MongoIterable ListCollectionsIterable)
|
||||
[mongo-driver-3.collection :as mc]
|
||||
[mongo-driver-3.model :as m])
|
||||
(:import (com.mongodb.client MongoClient MongoDatabase MongoIterable ListCollectionsIterable ClientSession)
|
||||
(java.util UUID)
|
||||
(com.mongodb ClientSessionOptions ReadConcern ReadPreference)
|
||||
(java.util.concurrent TimeUnit)))
|
||||
|
|
@ -83,23 +84,44 @@
|
|||
|
||||
(deftest ^:integration test-with-transaction
|
||||
(let [db (new-db @client)
|
||||
_ (mc/create db "test")]
|
||||
_ (mc/create db "test")]
|
||||
(with-open [session (mg/start-session @client)]
|
||||
(is (= 2 (mg/with-transaction session
|
||||
(fn []
|
||||
(mc/insert-one db "test" {:a 1} {:session session})
|
||||
(mc/insert-one db "test" {:a 2} {:session session})
|
||||
(mc/count-documents db "test" {} {:session session}))))))))
|
||||
|
||||
(deftest ^:integration test-with-implicit-transaction
|
||||
(testing "passing"
|
||||
(let [db (new-db @client)
|
||||
_ (mc/create db "test")]
|
||||
(is (= 2 (mg/with-implicit-transaction
|
||||
{:client @client}
|
||||
(fn []
|
||||
(mc/insert-one db "test" {:a 1} {:session session})
|
||||
(mc/insert-one db "test" {:a 2} {:session session})
|
||||
(mc/count-documents db "test" {} {:session session}))))))))
|
||||
(mc/insert-one db "test" {:a 1})
|
||||
(mc/insert-one db "test" {:a 2})
|
||||
(mc/count-documents db "test" {})))))))
|
||||
|
||||
(testing "failing"
|
||||
(let [db (new-db @client)
|
||||
_ (mc/create db "test")]
|
||||
(is (= 0 (try (mg/with-implicit-transaction
|
||||
{:client @client}
|
||||
(fn []
|
||||
(mc/insert-one db "test" {:a 1})
|
||||
(mc/insert-one db "test" {nil 2})))
|
||||
(catch Exception _ (mc/count-documents db "test" {}))))))))
|
||||
|
||||
(deftest ^:integration test-with-transaction
|
||||
(testing "passing"
|
||||
(let [db (new-db @client)
|
||||
_ (mc/create db "test")]
|
||||
_ (mc/create db "test")]
|
||||
(with-open [session (mg/start-session @client)]
|
||||
(is (= 2 (mg/with-transaction session
|
||||
(fn []
|
||||
(mc/insert-one db "test" {:a 1} {:session session})
|
||||
(mc/insert-one db "test" {:a 2} {:session session})
|
||||
(fn []
|
||||
(mc/insert-one db "test" {:a 1} {:session session})
|
||||
(mc/insert-one db "test" {:a 2} {:session session})
|
||||
(mc/count-documents db "test" {} {:session session}))))))))
|
||||
|
||||
(testing "failing"
|
||||
|
|
|
|||
Loading…
Reference in a new issue