support transactions

This commit is contained in:
George Narroway 2019-11-22 17:55:03 +08:00
parent c723f2a36f
commit f737c6ffb6
5 changed files with 99 additions and 33 deletions

View file

@ -1,6 +1,10 @@
# Change Log
All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).
## [Unreleased]
### Added
- with-transaction
## [0.4.0]
### Added
- list collections

View file

@ -23,7 +23,7 @@ It was developed with the following goals:
## Status
mongo-driver-3 is under active development and the API may change.
mongo-driver-3 is used in production, but is also under development and the API may change slightly.
Please try it out and raise any issues you may find.
## Usage
@ -124,7 +124,7 @@ for full API documentation.
Many mongo queries take operators like `$eq` and `$gt`. These are exposed in the `mongo-driver-3.operator` namespace.
``` clojure
```clojure
(ns my.app
(:require [mongo-driver-3.collection :as mc]
[mongo-driver-3.operator :refer [$gt]))
@ -135,6 +135,23 @@ Many mongo queries take operators like `$eq` and `$gt`. These are exposed in the
(mc/find db "test" {:a {"$gt" 3}})
```
### Using transactions
You can create a session to perform multi-document transactions, where all operations either
succeed or none are persisted.
It is important to
use `with-open` so the session is closed after both successful and failed transactions.
```clojure
;; Inserts 2 documents into a collection
(with-open [s (mg/start-session client)]
(mg/with-transaction s
(fn []
(mc/insert-one my-db "coll" {:name "hello"} {:session s})
(mc/insert-one my-db "coll" {:name "world"} {:session s}))))
```
## License
Released under the MIT License: http://www.opensource.org/licenses/mit-license.php

View file

@ -1,10 +1,12 @@
(ns mongo-driver-3.client
(:refer-clojure :exclude [find])
(:require [mongo-driver-3.collection :as mc])
(:import (com.mongodb.client MongoClients MongoClient ClientSession MongoDatabase)
(:import (com.mongodb.client MongoClients MongoClient ClientSession MongoDatabase TransactionBody)
(com.mongodb ConnectionString ClientSessionOptions TransactionOptions)
(java.util.concurrent TimeUnit)))
(set! *warn-on-reflection* true)
;;; Core
(defn create
@ -75,13 +77,12 @@
rc (mc/->ReadConcern opts)
wc (mc/->WriteConcern opts)]
(when (some some? [max-commit-time-ms rp rc wc])
(cond-> (TransactionOptions/builder)
max-commit-time-ms (.maxCommitTime max-commit-time-ms (TimeUnit/MILLISECONDS))
rp (.readPreference rp)
rc (.readConcern rc)
wc (.writeConcern wc)
true (.build)))))
(cond-> (TransactionOptions/builder)
max-commit-time-ms (.maxCommitTime max-commit-time-ms (TimeUnit/MILLISECONDS))
rp (.readPreference rp)
rc (.readConcern rc)
wc (.writeConcern wc)
true (.build))))
(defn ->ClientSessionOptions
"Coerces an options map into a ClientSessionOptions See `start-session` for usage.
@ -122,6 +123,25 @@
([^MongoClient client opts]
(.startSession client (->ClientSessionOptions opts))))
(defn with-transaction
"Executes `body` in a transaction.
`body` should be a fn with one or more mongo operations in it.
Ensure `session` is passed as an option to each operation.
e.g.
(with-open [s (start-session client)]
(with-transaction s
(fn []
(insert-one my-db \"coll\" {:name \"hello\"} {:session s})
(insert-one my-db \"coll\" {:name \"world\"} {:session s}))))"
([^ClientSession session body] (with-transaction session body {}))
([^ClientSession session body opts]
(.withTransaction session
(reify TransactionBody
(execute [_] (body)))
(->TransactionOptions opts))))
;;; Utility
(defn connect-to-db

View file

@ -2,13 +2,15 @@
(:refer-clojure :exclude [find empty? drop])
(:import (clojure.lang Ratio Keyword Named IPersistentMap)
(com.mongodb ReadConcern ReadPreference WriteConcern MongoNamespace)
(com.mongodb.client MongoDatabase MongoCollection TransactionBody ClientSession)
(com.mongodb.client MongoDatabase MongoCollection ClientSession)
(com.mongodb.client.model InsertOneOptions InsertManyOptions DeleteOptions FindOneAndUpdateOptions ReturnDocument FindOneAndReplaceOptions CountOptions CreateCollectionOptions RenameCollectionOptions IndexOptions IndexModel UpdateOptions ReplaceOptions)
(java.util List Collection)
(java.util.concurrent TimeUnit)
(org.bson Document)
(org.bson.types Decimal128)))
(set! *warn-on-reflection* true)
;;; Conversions
(defprotocol ConvertToDocument
@ -661,22 +663,4 @@
(list-indexes db coll {}))
([^MongoDatabase db coll opts]
(->> (.listIndexes (collection db coll opts))
(map #(from-document % true)))))
;;; Utility functions
(defn- with-transaction
"Executes `body` in a transaction.
`body` should be a fn with one or more mongo operations in it.
Ensure `session` is passed as an option to each operation.
e.g.
(def s (start-session client))
(with-transaction s
(fn []
(insert-one my-db \"coll\" {:name \"hello\"} {:session s})
(insert-one my-db \"coll\" {:name \"world\"} {:session s})))"
[^ClientSession session body]
(.withTransaction session (reify TransactionBody
(execute [_] body))))
(map #(from-document % true)))))

View file

@ -55,7 +55,7 @@
[client]
(mg/get-db client (.toString (UUID/randomUUID))))
(deftest ^:integration test-list-collections
(deftest ^:integration test-list-collections
(let [db (new-db @client)
_ (mc/create db "test")]
(is (= ["test"] (map :name (mg/list-collections db))))
@ -68,5 +68,46 @@
(is (= ["test"] (mg/list-collection-names db)))
(is (instance? MongoIterable (mg/list-collection-names db {:raw? true})))))
#_(deftest ^:integration test-start-session
(is (instance? ClientSession (mg/start-session @client))))
(comment
;; Currently in a comment because it is troublesome to set up a replset in the CI
;; docker run -it --rm -p 27017:27017 mongo:4.2 --replSet rs1
;; Ensure we have a replica set so we can run session tests
(let [client (mg/create mongo-host)
admin-db (mg/get-db client "admin")]
(try (.runCommand admin-db (mc/document {:replSetInitiate {}}))
(catch Exception _ "already initialized")))
(deftest ^:integration test-start-session
(is (instance? ClientSession (mg/start-session @client))))
(deftest ^:integration test-with-transaction
(let [db (new-db @client)
_ (mc/create db "test")]
(with-open [session (mg/start-session @client)]
(is (= 2 (mg/with-transaction session
(fn []
(mc/insert-one db "test" {:a 1} {:session session})
(mc/insert-one db "test" {:a 2} {:session session})
(mc/count-documents db "test" {} {:session session}))))))))
(deftest ^:integration test-with-transaction
(testing "passing"
(let [db (new-db @client)
_ (mc/create db "test")]
(with-open [session (mg/start-session @client)]
(is (= 2 (mg/with-transaction session
(fn []
(mc/insert-one db "test" {:a 1} {:session session})
(mc/insert-one db "test" {:a 2} {:session session})
(mc/count-documents db "test" {} {:session session}))))))))
(testing "failing"
(let [db (new-db @client)
_ (mc/create db "test")]
(with-open [session (mg/start-session @client)]
(is (= 0 (try (mg/with-transaction session
(fn []
(mc/insert-one db "test" {:a 1} {:session session})
(mc/insert-one db "test" {nil 2} {:session session})))
(catch Exception _ (mc/count-documents db "test" {}))))))))))