Add a helper method for implicit transactions

This commit is contained in:
David Williams 2021-03-23 19:07:54 +00:00
parent bcc547ebf7
commit aa5a7118fd
3 changed files with 76 additions and 35 deletions

View file

@ -145,6 +145,20 @@
(execute [_] (body)))
(->TransactionOptions opts))))
(def ^:dynamic *session* nil)
(defn with-implicit-transaction
"Automatically sets the session / transaction for all mongo operations
within the scope, using a dynamic binding"
[{:keys [^MongoClient client transaction-opts session-opts] :or {transaction-opts {} session-opts {}}} body]
(with-open [^ClientSession session (start-session client session-opts)]
(binding [*session* session]
(with-transaction
*session*
body
transaction-opts))))
;;; Utility
(defn connect-to-db

View file

@ -1,6 +1,7 @@
(ns mongo-driver-3.collection
(:refer-clojure :exclude [find empty? drop])
(:require [mongo-driver-3.model :refer :all])
(:require [mongo-driver-3.model :refer :all]
[mongo-driver-3.client :refer [*session*]])
(:import (com.mongodb MongoNamespace)
(com.mongodb.client MongoDatabase MongoCollection ClientSession)
(com.mongodb.client.model IndexModel)
@ -67,12 +68,13 @@
(aggregate db coll pipeline {}))
([^MongoDatabase db coll pipeline opts]
(let [{:keys [^ClientSession session allow-disk-use? ^Integer batch-size bypass-document-validation? keywordize? raw?] :or {keywordize? true raw? false}} opts
it (cond-> (if session
(.aggregate (collection db coll opts) session ^List (map document pipeline))
(.aggregate (collection db coll opts) ^List (map document pipeline)))
(some? allow-disk-use?) (.allowDiskUse allow-disk-use?)
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)
batch-size (.batchSize batch-size))]
^ClientSession session (or session *session*)
it (cond-> (if session
(.aggregate (collection db coll opts) session ^List (map document pipeline))
(.aggregate (collection db coll opts) ^List (map document pipeline)))
(some? allow-disk-use?) (.allowDiskUse allow-disk-use?)
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)
batch-size (.batchSize batch-size))]
(if-not raw?
(map (fn [x] (from-document x keywordize?)) (seq it))
@ -102,7 +104,7 @@
(bulk-write db coll operations {}))
([^MongoDatabase db coll operations opts]
(let [opts' (->BulkWriteOptions opts)]
(if-let [session (:session opts)]
(if-let [session (or (:session opts) *session*)]
(.bulkWrite (collection db coll opts') ^ClientSession session ^List (map write-model operations))
(.bulkWrite (collection db coll opts') (map write-model operations))))))
@ -130,7 +132,7 @@
(count-documents db coll q {}))
([^MongoDatabase db coll q opts]
(let [opts' (->CountOptions opts)]
(if-let [session (:session opts)]
(if-let [session (or (:session opts) *session*)]
(.countDocuments (collection db coll opts) session (document q) opts')
(.countDocuments (collection db coll opts) (document q) opts')))))
@ -150,7 +152,7 @@
([^MongoDatabase db coll q]
(delete-one db coll q {}))
([^MongoDatabase db coll q opts]
(if-let [session (:session opts)]
(if-let [session (or (:session opts) *session*)]
(.deleteOne (collection db coll opts) session (document q) (->DeleteOptions opts))
(.deleteOne (collection db coll opts) (document q) (->DeleteOptions opts)))))
@ -170,7 +172,7 @@
([^MongoDatabase db coll q]
(delete-many db coll q {}))
([^MongoDatabase db coll q opts]
(if-let [session (:session opts)]
(if-let [session (or (:session opts) *session*)]
(.deleteMany (collection db coll opts) session (document q) (->DeleteOptions opts))
(.deleteMany (collection db coll opts) (document q) (->DeleteOptions opts)))))
@ -196,13 +198,14 @@
(find db coll q {}))
([^MongoDatabase db coll q opts]
(let [{:keys [limit skip sort projection ^ClientSession session keywordize? raw?] :or {keywordize? true raw? false}} opts]
(let [it (cond-> (if session
(.find (collection db coll opts) session (document q))
(.find (collection db coll opts) (document q)))
limit (.limit limit)
skip (.skip skip)
sort (.sort (document sort))
projection (.projection (document projection)))]
(let [^ClientSession session (or session *session*)
it (cond-> (if session
(.find (collection db coll opts) session (document q))
(.find (collection db coll opts) (document q)))
limit (.limit limit)
skip (.skip skip)
sort (.sort (document sort))
projection (.projection (document projection)))]
(if-not raw?
(map (fn [x] (from-document x keywordize?)) (seq it))
@ -241,7 +244,8 @@
(find-one-and-update db coll q update {}))
([^MongoDatabase db coll q update opts]
(let [{:keys [keywordize? ^ClientSession session] :or {keywordize? true}} opts
opts' (->FindOneAndUpdateOptions opts)]
^ClientSession session (or session *session*)
opts' (->FindOneAndUpdateOptions opts)]
(-> (if session
(.findOneAndUpdate (collection db coll opts) session (document q) (document update) opts')
(.findOneAndUpdate (collection db coll opts) (document q) (document update) opts'))
@ -271,7 +275,8 @@
(find-one-and-replace db coll q doc {}))
([^MongoDatabase db coll q doc opts]
(let [{:keys [keywordize? session] :or {keywordize? true}} opts
opts' (->FindOneAndReplaceOptions opts)]
session (or session *session*)
opts' (->FindOneAndReplaceOptions opts)]
(-> (if session
(.findOneAndReplace (collection db coll opts) session (document q) (document doc) opts')
(.findOneAndReplace (collection db coll opts) (document q) (document doc) opts'))
@ -297,7 +302,7 @@
(insert-one db coll doc {}))
([^MongoDatabase db coll doc opts]
(let [opts' (->InsertOneOptions opts)]
(if-let [session (:session opts)]
(if-let [session (or (:session opts) *session*)]
(.insertOne (collection db coll opts) session (document doc) opts')
(.insertOne (collection db coll opts) (document doc) opts')))))
@ -322,7 +327,7 @@
(insert-many db coll docs {}))
([^MongoDatabase db coll docs opts]
(let [opts' (->InsertManyOptions opts)]
(if-let [^ClientSession session (:session opts)]
(if-let [^ClientSession session (or (:session opts) *session*)]
(.insertMany (collection db coll opts) session ^List (map document docs) opts')
(.insertMany (collection db coll opts) ^List (map document docs) opts')))))
@ -346,7 +351,7 @@
([^MongoDatabase db coll q doc]
(find-one-and-replace db coll q doc {}))
([^MongoDatabase db coll q doc opts]
(if-let [^ClientSession session (:session opts)]
(if-let [^ClientSession session (or (:session opts) *session*)]
(.replaceOne (collection db coll opts) session (document q) (document doc) (->ReplaceOptions opts))
(.replaceOne (collection db coll opts) (document q) (document doc) (->ReplaceOptions opts)))))
@ -370,7 +375,7 @@
([^MongoDatabase db coll q update]
(update-one db coll q update {}))
([^MongoDatabase db coll q update opts]
(if-let [^ClientSession session (:session opts)]
(if-let [^ClientSession session (or (:session opts) *session*)]
(.updateOne (collection db coll opts) session (document q) (document update) (->UpdateOptions opts))
(.updateOne (collection db coll opts) (document q) (document update) (->UpdateOptions opts)))))
@ -394,7 +399,7 @@
([^MongoDatabase db coll q update]
(update-many db coll q update {}))
([^MongoDatabase db coll q update opts]
(if-let [^ClientSession session (:session opts)]
(if-let [^ClientSession session (or (:session opts) *session*)]
(.updateMany (collection db coll opts) session (document q) (document update) (->UpdateOptions opts))
(.updateMany (collection db coll opts) (document q) (document update) (->UpdateOptions opts)))))

View file

@ -1,8 +1,9 @@
(ns mongo-driver-3.client-test
(:require [clojure.test :refer :all]
[mongo-driver-3.client :as mg]
[mongo-driver-3.collection :as mc])
(:import (com.mongodb.client MongoClient MongoDatabase MongoIterable ListCollectionsIterable)
[mongo-driver-3.collection :as mc]
[mongo-driver-3.model :as m])
(:import (com.mongodb.client MongoClient MongoDatabase MongoIterable ListCollectionsIterable ClientSession)
(java.util UUID)
(com.mongodb ClientSessionOptions ReadConcern ReadPreference)
(java.util.concurrent TimeUnit)))
@ -83,23 +84,44 @@
(deftest ^:integration test-with-transaction
(let [db (new-db @client)
_ (mc/create db "test")]
_ (mc/create db "test")]
(with-open [session (mg/start-session @client)]
(is (= 2 (mg/with-transaction session
(fn []
(mc/insert-one db "test" {:a 1} {:session session})
(mc/insert-one db "test" {:a 2} {:session session})
(mc/count-documents db "test" {} {:session session}))))))))
(deftest ^:integration test-with-implicit-transaction
(testing "passing"
(let [db (new-db @client)
_ (mc/create db "test")]
(is (= 2 (mg/with-implicit-transaction
{:client @client}
(fn []
(mc/insert-one db "test" {:a 1} {:session session})
(mc/insert-one db "test" {:a 2} {:session session})
(mc/count-documents db "test" {} {:session session}))))))))
(mc/insert-one db "test" {:a 1})
(mc/insert-one db "test" {:a 2})
(mc/count-documents db "test" {})))))))
(testing "failing"
(let [db (new-db @client)
_ (mc/create db "test")]
(is (= 0 (try (mg/with-implicit-transaction
{:client @client}
(fn []
(mc/insert-one db "test" {:a 1})
(mc/insert-one db "test" {nil 2})))
(catch Exception _ (mc/count-documents db "test" {}))))))))
(deftest ^:integration test-with-transaction
(testing "passing"
(let [db (new-db @client)
_ (mc/create db "test")]
_ (mc/create db "test")]
(with-open [session (mg/start-session @client)]
(is (= 2 (mg/with-transaction session
(fn []
(mc/insert-one db "test" {:a 1} {:session session})
(mc/insert-one db "test" {:a 2} {:session session})
(fn []
(mc/insert-one db "test" {:a 1} {:session session})
(mc/insert-one db "test" {:a 2} {:session session})
(mc/count-documents db "test" {} {:session session}))))))))
(testing "failing"