support bulk-write

This commit is contained in:
George Narroway 2020-01-09 23:40:08 +08:00
parent cb684136e5
commit e7c18692a1
5 changed files with 185 additions and 46 deletions

View file

@ -1,4 +1,4 @@
(defproject mongo-driver-3 "0.5.0"
(defproject mongo-driver-3 "0.6.0-SNAPSHOT"
:description "A Clojure wrapper for the Java MongoDB driver 3.11+."
:url "https://github.com/gnarroway/mongo-driver-3"
:license {:name "The MIT License"

View file

@ -41,9 +41,9 @@
rc (->ReadConcern opts)
wc (->WriteConcern opts)]
(cond-> ^MongoCollection coll'
rp (.withReadPreference rp)
rc (.withReadConcern rc)
wc (.withWriteConcern wc)))))
rp (.withReadPreference rp)
rc (.withReadConcern rc)
wc (.withWriteConcern wc)))))
;;; CRUD functions
@ -70,14 +70,42 @@
it (cond-> (if session
(.aggregate (collection db coll opts) session ^List (map document pipeline))
(.aggregate (collection db coll opts) ^List (map document pipeline)))
(some? allow-disk-use?) (.allowDiskUse allow-disk-use?)
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)
batch-size (.batchSize batch-size))]
(some? allow-disk-use?) (.allowDiskUse allow-disk-use?)
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)
batch-size (.batchSize batch-size))]
(if-not raw?
(map (fn [x] (from-document x keywordize?)) (seq it))
it))))
(defn bulk-write
"Executes a mix of inserts, updates, replaces, and deletes.
- `db` is a MongoDatabase
- `coll` is a collection name
- `operations` a list of 2-tuples in the form `[op config]`,
- `op` is one of :insert-one :update-one :update-many :delete-one :delete-many :replace-one
- `config` the configuration map for the operation
- `insert` takes `:document`
- `update` takes `:filter`, `:update`, and any options in the corresponding update function
- `delete` takes `:filter`, and any options in the corresponding delete function
- `replace` takes `:filter`, `:replacement`, and any options in the corresponding replace function
- `opts` (optional), a map of:
- `:bypass-document-validation?` Boolean
- `:ordered?` Boolean whether serve should insert documents in order provided (default true)
- `:bulk-write-options` A BulkWriteOptions for configuring directly. If specified,
any other [preceding] query options will be applied to it.
- `:session` A ClientSession
Additionally takes options specified in `collection`"
([^MongoDatabase db coll operations]
(bulk-write db coll operations {}))
([^MongoDatabase db coll operations opts]
(let [opts' (->BulkWriteOptions opts)]
(if-let [session (:session opts)]
(.bulkWrite (collection db coll opts') ^ClientSession session ^List (map write-model operations))
(.bulkWrite (collection db coll opts') (map write-model operations))))))
(defn count-documents
"Count documents in a collection, optionally matching a filter query `q`.
@ -171,10 +199,10 @@
(let [it (cond-> (if session
(.find (collection db coll opts) session (document q))
(.find (collection db coll opts) (document q)))
limit (.limit limit)
skip (.skip skip)
sort (.sort (document sort))
projection (.projection (document projection)))]
limit (.limit limit)
skip (.skip skip)
sort (.sort (document sort))
projection (.projection (document projection)))]
(if-not raw?
(map (fn [x] (from-document x keywordize?)) (seq it))

View file

@ -1,5 +1,5 @@
(ns mongo-driver-3.model
(:import (com.mongodb.client.model CountOptions DeleteOptions ReturnDocument FindOneAndUpdateOptions InsertOneOptions ReplaceOptions UpdateOptions CreateCollectionOptions RenameCollectionOptions InsertManyOptions FindOneAndReplaceOptions IndexOptions)
(:import (com.mongodb.client.model CountOptions DeleteOptions ReturnDocument FindOneAndUpdateOptions InsertOneOptions ReplaceOptions UpdateOptions CreateCollectionOptions RenameCollectionOptions InsertManyOptions FindOneAndReplaceOptions IndexOptions BulkWriteOptions DeleteManyModel DeleteOneModel InsertOneModel ReplaceOneModel UpdateManyModel UpdateOneModel)
(org.bson Document)
(java.util.concurrent TimeUnit)
(com.mongodb WriteConcern ReadPreference ReadConcern)
@ -91,7 +91,7 @@
(if (instance? ReadConcern read-concern)
read-concern
(or (kw->ReadConcern read-concern) (throw (IllegalArgumentException.
(str "No match for read concern of " (name read-concern))))))))
(str "No match for read concern of " (name read-concern))))))))
(defn ->ReadPreference
"Coerce `rp` into a ReadPreference if not nil. See `collection` for usage."
@ -110,19 +110,27 @@
write-concern
(WriteConcern/valueOf (name write-concern))))]
(cond-> (or wc (WriteConcern/ACKNOWLEDGED))
w (.withW w)
w-timeout-ms (.withWTimeout w-timeout-ms (TimeUnit/MILLISECONDS))
(some? journal?) (.withJournal journal?)))))
w (.withW w)
w-timeout-ms (.withWTimeout w-timeout-ms (TimeUnit/MILLISECONDS))
(some? journal?) (.withJournal journal?)))))
(defn ^BulkWriteOptions ->BulkWriteOptions
"Coerce options map into BulkWriteOptions. See `bulk-write` for usage."
[{:keys [bulk-write-options bypass-document-validation? ordered?]}]
(let [^BulkWriteOptions opts (or bulk-write-options (BulkWriteOptions.))]
(cond-> opts
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)
(some? ordered?) (.ordered ordered?))))
(defn ^CountOptions ->CountOptions
"Coerce options map into CountOptions. See `count-documents` for usage."
[{:keys [count-options hint limit max-time-ms skip]}]
(let [^CountOptions opts (or count-options (CountOptions.))]
(cond-> opts
hint (.hint (document hint))
limit (.limit limit)
max-time-ms (.maxTime max-time-ms (TimeUnit/MILLISECONDS))
skip (.skip skip))))
hint (.hint (document hint))
limit (.limit limit)
max-time-ms (.maxTime max-time-ms (TimeUnit/MILLISECONDS))
skip (.skip skip))))
(defn ^DeleteOptions ->DeleteOptions
"Coerce options map into DeleteOptions. See `delete-one` and `delete-many` for usage."
@ -135,20 +143,20 @@
[{:keys [find-one-and-replace-options upsert? return-new? sort projection]}]
(let [^FindOneAndReplaceOptions opts (or find-one-and-replace-options (FindOneAndReplaceOptions.))]
(cond-> opts
(some? upsert?) (.upsert upsert?)
return-new? (.returnDocument (ReturnDocument/AFTER))
sort (.sort (document sort))
projection (.projection (document projection)))))
(some? upsert?) (.upsert upsert?)
return-new? (.returnDocument (ReturnDocument/AFTER))
sort (.sort (document sort))
projection (.projection (document projection)))))
(defn ^FindOneAndUpdateOptions ->FindOneAndUpdateOptions
"Coerce options map into FindOneAndUpdateOptions. See `find-one-and-update` for usage."
[{:keys [find-one-and-update-options upsert? return-new? sort projection]}]
(let [^FindOneAndUpdateOptions opts (or find-one-and-update-options (FindOneAndUpdateOptions.))]
(cond-> opts
(some? upsert?) (.upsert upsert?)
return-new? (.returnDocument (ReturnDocument/AFTER))
sort (.sort (document sort))
projection (.projection (document projection)))))
(some? upsert?) (.upsert upsert?)
return-new? (.returnDocument (ReturnDocument/AFTER))
sort (.sort (document sort))
projection (.projection (document projection)))))
(defn ^IndexOptions ->IndexOptions
"Coerces an options map into an IndexOptions.
@ -157,55 +165,80 @@
[{:keys [index-options name sparse? unique?]}]
(let [^IndexOptions opts (or index-options (IndexOptions.))]
(cond-> opts
name (.name name)
(some? sparse?) (.sparse sparse?)
(some? unique?) (.unique unique?))))
name (.name name)
(some? sparse?) (.sparse sparse?)
(some? unique?) (.unique unique?))))
(defn ^InsertManyOptions ->InsertManyOptions
"Coerce options map into InsertManyOptions. See `insert-many` for usage."
[{:keys [insert-many-options bypass-document-validation? ordered?]}]
(let [^InsertManyOptions opts (or insert-many-options (InsertManyOptions.))]
(cond-> opts
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)
(some? ordered?) (.ordered ordered?))))
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)
(some? ordered?) (.ordered ordered?))))
(defn ^InsertOneOptions ->InsertOneOptions
"Coerce options map into InsertOneOptions. See `insert-one` for usage."
[{:keys [insert-one-options bypass-document-validation?]}]
(let [^InsertOneOptions opts (or insert-one-options (InsertOneOptions.))]
(cond-> opts
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?))))
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?))))
(defn ^ReplaceOptions ->ReplaceOptions
"Coerce options map into ReplaceOptions. See `replace-one` and `replace-many` for usage."
[{:keys [replace-options upsert? bypass-document-validation?]}]
(let [^ReplaceOptions opts (or replace-options (ReplaceOptions.))]
(cond-> opts
(some? upsert?) (.upsert upsert?)
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?))))
(some? upsert?) (.upsert upsert?)
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?))))
(defn ^UpdateOptions ->UpdateOptions
"Coerce options map into UpdateOptions. See `update-one` and `update-many` for usage."
[{:keys [update-options upsert? bypass-document-validation?]}]
(let [^UpdateOptions opts (or update-options (UpdateOptions.))]
(cond-> opts
(some? upsert?) (.upsert upsert?)
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?))))
(some? upsert?) (.upsert upsert?)
(some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?))))
(defn ^CreateCollectionOptions ->CreateCollectionOptions
"Coerce options map into CreateCollectionOptions. See `create` usage."
[{:keys [create-collection-options capped? max-documents max-size-bytes]}]
(let [^CreateCollectionOptions opts (or create-collection-options (CreateCollectionOptions.))]
(cond-> opts
(some? capped?) (.capped capped?)
max-documents (.maxDocuments max-documents)
max-size-bytes (.sizeInBytes max-size-bytes))))
(some? capped?) (.capped capped?)
max-documents (.maxDocuments max-documents)
max-size-bytes (.sizeInBytes max-size-bytes))))
(defn ^RenameCollectionOptions ->RenameCollectionOptions
"Coerce options map into RenameCollectionOptions. See `rename` usage."
[{:keys [rename-collection-options drop-target?]}]
(let [^RenameCollectionOptions opts (or rename-collection-options (RenameCollectionOptions.))]
(cond-> opts
(some? drop-target?) (.dropTarget drop-target?))))
(some? drop-target?) (.dropTarget drop-target?))))
(defmulti write-model
(fn [[type _]] type))
(defmethod write-model :delete-many
[[_ opts]]
(DeleteManyModel. (document (:filter opts)) (->DeleteOptions opts)))
(defmethod write-model :delete-one
[[_ opts]]
(DeleteOneModel. (document (:filter opts)) (->DeleteOptions opts)))
(defmethod write-model :insert-one
[[_ opts]]
(InsertOneModel. (document (:document opts))))
(defmethod write-model :replace-one
[[_ opts]]
(ReplaceOneModel. (document (:filter opts)) (document (:replacement opts)) (->ReplaceOptions opts)))
(defmethod write-model :update-many
[[_ opts]]
(UpdateManyModel. (document (:filter opts)) (document (:update opts)) (->UpdateOptions opts)))
(defmethod write-model :update-one
[[_ opts]]
(UpdateOneModel. (document (:filter opts)) (document (:update opts)) (->UpdateOptions opts)))

View file

@ -232,6 +232,28 @@
(is (nil? (dissoc (mc/find-one-and-replace db "test" {:id 1} {:id 1 :v 2} {:return-new? true}) :_id)))
(is (= {:id 1 :v 2} (dissoc (mc/find-one-and-replace db "test" {:id 1} {:id 1 :v 2} {:return-new? true :upsert? true}) :_id))))))
(deftest ^:integration test-bulk-write
(testing "existing docs"
(let [db (new-db @client)
_ (mc/insert-many db "test" [{:id 1} {:id 2} {:id 3} {:id 4}])
_ (mc/bulk-write db "test" [[:replace-one {:filter {:id 2} :replacement {:id 2.1}}]
[:update-many {:filter {:id 3} :update {:$set {:a "b"}}}]
[:update-one {:filter {:id 4} :update {:$set {:a "b"}}}]])]
(is (= [{:id 1} {:id 2.1} {:id 3 :a "b"} {:id 4 :a "b"}]
(mc/find db "test" {} {:projection {:_id 0}})))))
(testing "upsert"
(let [db (new-db @client)
res (mc/bulk-write db "test" [[:insert-one {:document {:id 1}}]
[:replace-one {:filter {:id 2} :replacement {:id 2.1} :upsert? true}]
[:update-many {:filter {:id 3} :update {:$set {:a "b"}} :upsert? true}]
[:update-one {:filter {:id 4} :update {:$set {:a "b"}} :upsert? true}]])]
(is (= 4 (mc/count-documents db "test")))
(is (= 1 (.getInsertedCount res)))
(is (= 3 (count (.getUpserts res)))))))
(deftest ^:integration test-replace-one
(testing "existing doc"
(let [db (new-db @client)

View file

@ -3,7 +3,7 @@
[mongo-driver-3.model :as m])
(:import (com.mongodb ReadConcern ReadPreference WriteConcern)
(java.util.concurrent TimeUnit)
(com.mongodb.client.model InsertOneOptions InsertManyOptions DeleteOptions FindOneAndUpdateOptions ReturnDocument FindOneAndReplaceOptions CountOptions UpdateOptions ReplaceOptions IndexOptions CreateCollectionOptions RenameCollectionOptions)))
(com.mongodb.client.model InsertOneOptions InsertManyOptions DeleteOptions FindOneAndUpdateOptions ReturnDocument FindOneAndReplaceOptions CountOptions UpdateOptions ReplaceOptions IndexOptions CreateCollectionOptions RenameCollectionOptions BulkWriteOptions DeleteManyModel DeleteOneModel InsertOneModel ReplaceOneModel UpdateManyModel UpdateOneModel)))
;;; Unit
@ -44,7 +44,8 @@
(= expected (.isUpsert (m/->ReplaceOptions {:upsert? arg})))
true true
false false
false nil) (is (true? (.getBypassDocumentValidation (m/->ReplaceOptions {:bypass-document-validation? true}))))
false nil)
(is (true? (.getBypassDocumentValidation (m/->ReplaceOptions {:bypass-document-validation? true}))))
(is (true? (.getBypassDocumentValidation (m/->ReplaceOptions
{:replace-options (.bypassDocumentValidation (ReplaceOptions.) true)})))
"configure directly")
@ -85,7 +86,7 @@
{:insert-many-options (.bypassDocumentValidation (InsertManyOptions.) true)})))
"configure directly")
(is (false? (.getBypassDocumentValidation (m/->InsertManyOptions
{:insert-one-options (.bypassDocumentValidation (InsertManyOptions.) true)
{:insert-many-options (.bypassDocumentValidation (InsertManyOptions.) true)
:bypass-document-validation? false})))
"can override"))
@ -176,4 +177,59 @@
(is (= opts (m/->CreateCollectionOptions {:create-collection-options opts})) "configure directly")
(is (= 5 (.getMaxDocuments (m/->CreateCollectionOptions {:create-collection-options opts}))))
(is (= 7 (.getMaxDocuments (m/->CreateCollectionOptions {:create-collection-options opts :max-documents 7})))
"can override")))
"can override")))
(deftest test->BulkWriteOptions
(is (instance? BulkWriteOptions (m/->BulkWriteOptions {})))
(are [expected arg]
(= expected (.getBypassDocumentValidation (m/->BulkWriteOptions {:bypass-document-validation? arg})))
true true
false false
nil nil)
(are [expected arg]
(= expected (.isOrdered (m/->BulkWriteOptions {:ordered? arg})))
true true
false false
true nil)
(is (true? (.getBypassDocumentValidation (m/->BulkWriteOptions
{:bulk-write-options (.bypassDocumentValidation (BulkWriteOptions.) true)})))
"configure directly")
(is (false? (.getBypassDocumentValidation (m/->BulkWriteOptions
{:bulk-write-options (.bypassDocumentValidation (BulkWriteOptions.) true)
:bypass-document-validation? false})))
"can override"))
(deftest test-write-model
(testing "delete many"
(is (instance? DeleteManyModel (m/write-model [:delete-many {:filter {:a "b"}}]))))
(testing "delete one"
(is (instance? DeleteOneModel (m/write-model [:delete-one {:filter {:a "b"}}]))))
(testing "insert one"
(is (instance? InsertOneModel (m/write-model [:insert-one {:document {:a "b"}}]))))
(testing "replace one"
(is (instance? ReplaceOneModel (m/write-model [:replace-one {:filter {:a "b"} :replacement {:a "c"}}])))
(are [expected arg]
(= expected (.isUpsert (.getOptions (m/write-model [:replace-one {:filter {:a "b"} :replacement {:a "c"} :upsert? arg}]))))
true true
false false
false nil))
(testing "update many"
(is (instance? UpdateManyModel (m/write-model [:update-many {:filter {:a "b"} :update {"$set" {:a "c"}}}])))
(are [expected arg]
(= expected (.isUpsert (.getOptions (m/write-model [:update-many {:filter {:a "b"} :update {"$set" {:a "c"}} :upsert? arg}]))))
true true
false false
false nil))
(testing "update one"
(is (instance? UpdateOneModel (m/write-model [:update-one {:filter {:a "b"} :update {"$set" {:a "c"}}}])))
(are [expected arg]
(= expected (.isUpsert (.getOptions (m/write-model [:update-one {:filter {:a "b"} :update {"$set" {:a "c"}} :upsert? arg}]))))
true true
false false
false nil)))