diff --git a/src/mongo_driver_3/client.clj b/src/mongo_driver_3/client.clj index 09873a1..f4de099 100644 --- a/src/mongo_driver_3/client.clj +++ b/src/mongo_driver_3/client.clj @@ -145,6 +145,20 @@ (execute [_] (body))) (->TransactionOptions opts)))) + +(def ^:dynamic *session* nil) + +(defn with-implicit-transaction + "Automatically sets the session / transaction for all mongo operations + within the scope, using a dynamic binding" + [{:keys [^MongoClient client transaction-opts session-opts] :or {transaction-opts {} session-opts {}}} body] + (with-open [^ClientSession session (start-session client session-opts)] + (binding [*session* session] + (with-transaction + *session* + body + transaction-opts)))) + ;;; Utility (defn connect-to-db diff --git a/src/mongo_driver_3/collection.clj b/src/mongo_driver_3/collection.clj index 5717ff4..ec1592e 100644 --- a/src/mongo_driver_3/collection.clj +++ b/src/mongo_driver_3/collection.clj @@ -1,6 +1,7 @@ (ns mongo-driver-3.collection (:refer-clojure :exclude [find empty? drop]) - (:require [mongo-driver-3.model :refer :all]) + (:require [mongo-driver-3.model :refer :all] + [mongo-driver-3.client :refer [*session*]]) (:import (com.mongodb MongoNamespace) (com.mongodb.client MongoDatabase MongoCollection ClientSession) (com.mongodb.client.model IndexModel) @@ -67,12 +68,13 @@ (aggregate db coll pipeline {})) ([^MongoDatabase db coll pipeline opts] (let [{:keys [^ClientSession session allow-disk-use? ^Integer batch-size bypass-document-validation? keywordize? raw?] :or {keywordize? true raw? false}} opts - it (cond-> (if session - (.aggregate (collection db coll opts) session ^List (map document pipeline)) - (.aggregate (collection db coll opts) ^List (map document pipeline))) - (some? allow-disk-use?) (.allowDiskUse allow-disk-use?) - (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?) - batch-size (.batchSize batch-size))] + ^ClientSession session (or session *session*) + it (cond-> (if session + (.aggregate (collection db coll opts) session ^List (map document pipeline)) + (.aggregate (collection db coll opts) ^List (map document pipeline))) + (some? allow-disk-use?) (.allowDiskUse allow-disk-use?) + (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?) + batch-size (.batchSize batch-size))] (if-not raw? (map (fn [x] (from-document x keywordize?)) (seq it)) @@ -102,7 +104,7 @@ (bulk-write db coll operations {})) ([^MongoDatabase db coll operations opts] (let [opts' (->BulkWriteOptions opts)] - (if-let [session (:session opts)] + (if-let [session (or (:session opts) *session*)] (.bulkWrite (collection db coll opts') ^ClientSession session ^List (map write-model operations)) (.bulkWrite (collection db coll opts') (map write-model operations)))))) @@ -130,7 +132,7 @@ (count-documents db coll q {})) ([^MongoDatabase db coll q opts] (let [opts' (->CountOptions opts)] - (if-let [session (:session opts)] + (if-let [session (or (:session opts) *session*)] (.countDocuments (collection db coll opts) session (document q) opts') (.countDocuments (collection db coll opts) (document q) opts'))))) @@ -150,7 +152,7 @@ ([^MongoDatabase db coll q] (delete-one db coll q {})) ([^MongoDatabase db coll q opts] - (if-let [session (:session opts)] + (if-let [session (or (:session opts) *session*)] (.deleteOne (collection db coll opts) session (document q) (->DeleteOptions opts)) (.deleteOne (collection db coll opts) (document q) (->DeleteOptions opts))))) @@ -170,7 +172,7 @@ ([^MongoDatabase db coll q] (delete-many db coll q {})) ([^MongoDatabase db coll q opts] - (if-let [session (:session opts)] + (if-let [session (or (:session opts) *session*)] (.deleteMany (collection db coll opts) session (document q) (->DeleteOptions opts)) (.deleteMany (collection db coll opts) (document q) (->DeleteOptions opts))))) @@ -196,13 +198,14 @@ (find db coll q {})) ([^MongoDatabase db coll q opts] (let [{:keys [limit skip sort projection ^ClientSession session keywordize? raw?] :or {keywordize? true raw? false}} opts] - (let [it (cond-> (if session - (.find (collection db coll opts) session (document q)) - (.find (collection db coll opts) (document q))) - limit (.limit limit) - skip (.skip skip) - sort (.sort (document sort)) - projection (.projection (document projection)))] + (let [^ClientSession session (or session *session*) + it (cond-> (if session + (.find (collection db coll opts) session (document q)) + (.find (collection db coll opts) (document q))) + limit (.limit limit) + skip (.skip skip) + sort (.sort (document sort)) + projection (.projection (document projection)))] (if-not raw? (map (fn [x] (from-document x keywordize?)) (seq it)) @@ -241,7 +244,8 @@ (find-one-and-update db coll q update {})) ([^MongoDatabase db coll q update opts] (let [{:keys [keywordize? ^ClientSession session] :or {keywordize? true}} opts - opts' (->FindOneAndUpdateOptions opts)] + ^ClientSession session (or session *session*) + opts' (->FindOneAndUpdateOptions opts)] (-> (if session (.findOneAndUpdate (collection db coll opts) session (document q) (document update) opts') (.findOneAndUpdate (collection db coll opts) (document q) (document update) opts')) @@ -271,7 +275,8 @@ (find-one-and-replace db coll q doc {})) ([^MongoDatabase db coll q doc opts] (let [{:keys [keywordize? session] :or {keywordize? true}} opts - opts' (->FindOneAndReplaceOptions opts)] + session (or session *session*) + opts' (->FindOneAndReplaceOptions opts)] (-> (if session (.findOneAndReplace (collection db coll opts) session (document q) (document doc) opts') (.findOneAndReplace (collection db coll opts) (document q) (document doc) opts')) @@ -297,7 +302,7 @@ (insert-one db coll doc {})) ([^MongoDatabase db coll doc opts] (let [opts' (->InsertOneOptions opts)] - (if-let [session (:session opts)] + (if-let [session (or (:session opts) *session*)] (.insertOne (collection db coll opts) session (document doc) opts') (.insertOne (collection db coll opts) (document doc) opts'))))) @@ -322,7 +327,7 @@ (insert-many db coll docs {})) ([^MongoDatabase db coll docs opts] (let [opts' (->InsertManyOptions opts)] - (if-let [^ClientSession session (:session opts)] + (if-let [^ClientSession session (or (:session opts) *session*)] (.insertMany (collection db coll opts) session ^List (map document docs) opts') (.insertMany (collection db coll opts) ^List (map document docs) opts'))))) @@ -346,7 +351,7 @@ ([^MongoDatabase db coll q doc] (find-one-and-replace db coll q doc {})) ([^MongoDatabase db coll q doc opts] - (if-let [^ClientSession session (:session opts)] + (if-let [^ClientSession session (or (:session opts) *session*)] (.replaceOne (collection db coll opts) session (document q) (document doc) (->ReplaceOptions opts)) (.replaceOne (collection db coll opts) (document q) (document doc) (->ReplaceOptions opts))))) @@ -370,7 +375,7 @@ ([^MongoDatabase db coll q update] (update-one db coll q update {})) ([^MongoDatabase db coll q update opts] - (if-let [^ClientSession session (:session opts)] + (if-let [^ClientSession session (or (:session opts) *session*)] (.updateOne (collection db coll opts) session (document q) (document update) (->UpdateOptions opts)) (.updateOne (collection db coll opts) (document q) (document update) (->UpdateOptions opts))))) @@ -394,7 +399,7 @@ ([^MongoDatabase db coll q update] (update-many db coll q update {})) ([^MongoDatabase db coll q update opts] - (if-let [^ClientSession session (:session opts)] + (if-let [^ClientSession session (or (:session opts) *session*)] (.updateMany (collection db coll opts) session (document q) (document update) (->UpdateOptions opts)) (.updateMany (collection db coll opts) (document q) (document update) (->UpdateOptions opts))))) diff --git a/test/mongo_driver_3/client_test.clj b/test/mongo_driver_3/client_test.clj index ad64bab..cf77a98 100644 --- a/test/mongo_driver_3/client_test.clj +++ b/test/mongo_driver_3/client_test.clj @@ -1,8 +1,9 @@ (ns mongo-driver-3.client-test (:require [clojure.test :refer :all] [mongo-driver-3.client :as mg] - [mongo-driver-3.collection :as mc]) - (:import (com.mongodb.client MongoClient MongoDatabase MongoIterable ListCollectionsIterable) + [mongo-driver-3.collection :as mc] + [mongo-driver-3.model :as m]) + (:import (com.mongodb.client MongoClient MongoDatabase MongoIterable ListCollectionsIterable ClientSession) (java.util UUID) (com.mongodb ClientSessionOptions ReadConcern ReadPreference) (java.util.concurrent TimeUnit))) @@ -83,23 +84,44 @@ (deftest ^:integration test-with-transaction (let [db (new-db @client) - _ (mc/create db "test")] + _ (mc/create db "test")] (with-open [session (mg/start-session @client)] (is (= 2 (mg/with-transaction session + (fn [] + (mc/insert-one db "test" {:a 1} {:session session}) + (mc/insert-one db "test" {:a 2} {:session session}) + (mc/count-documents db "test" {} {:session session})))))))) + + (deftest ^:integration test-with-implicit-transaction + (testing "passing" + (let [db (new-db @client) + _ (mc/create db "test")] + (is (= 2 (mg/with-implicit-transaction + {:client @client} (fn [] - (mc/insert-one db "test" {:a 1} {:session session}) - (mc/insert-one db "test" {:a 2} {:session session}) - (mc/count-documents db "test" {} {:session session})))))))) + (mc/insert-one db "test" {:a 1}) + (mc/insert-one db "test" {:a 2}) + (mc/count-documents db "test" {}))))))) + + (testing "failing" + (let [db (new-db @client) + _ (mc/create db "test")] + (is (= 0 (try (mg/with-implicit-transaction + {:client @client} + (fn [] + (mc/insert-one db "test" {:a 1}) + (mc/insert-one db "test" {nil 2}))) + (catch Exception _ (mc/count-documents db "test" {})))))))) (deftest ^:integration test-with-transaction (testing "passing" (let [db (new-db @client) - _ (mc/create db "test")] + _ (mc/create db "test")] (with-open [session (mg/start-session @client)] (is (= 2 (mg/with-transaction session - (fn [] - (mc/insert-one db "test" {:a 1} {:session session}) - (mc/insert-one db "test" {:a 2} {:session session}) + (fn [] + (mc/insert-one db "test" {:a 1} {:session session}) + (mc/insert-one db "test" {:a 2} {:session session}) (mc/count-documents db "test" {} {:session session})))))))) (testing "failing"