From e7c18692a1127d5661bd274ecc1982e0de71baee Mon Sep 17 00:00:00 2001 From: George Narroway Date: Thu, 9 Jan 2020 23:40:08 +0800 Subject: [PATCH] support bulk-write --- project.clj | 2 +- src/mongo_driver_3/collection.clj | 48 ++++++++++--- src/mongo_driver_3/model.clj | 95 +++++++++++++++++-------- test/mongo_driver_3/collection_test.clj | 22 ++++++ test/mongo_driver_3/model_test.clj | 64 +++++++++++++++-- 5 files changed, 185 insertions(+), 46 deletions(-) diff --git a/project.clj b/project.clj index b6cd4e9..d26fa18 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject mongo-driver-3 "0.5.0" +(defproject mongo-driver-3 "0.6.0-SNAPSHOT" :description "A Clojure wrapper for the Java MongoDB driver 3.11+." :url "https://github.com/gnarroway/mongo-driver-3" :license {:name "The MIT License" diff --git a/src/mongo_driver_3/collection.clj b/src/mongo_driver_3/collection.clj index b6b6f88..5717ff4 100644 --- a/src/mongo_driver_3/collection.clj +++ b/src/mongo_driver_3/collection.clj @@ -41,9 +41,9 @@ rc (->ReadConcern opts) wc (->WriteConcern opts)] (cond-> ^MongoCollection coll' - rp (.withReadPreference rp) - rc (.withReadConcern rc) - wc (.withWriteConcern wc))))) + rp (.withReadPreference rp) + rc (.withReadConcern rc) + wc (.withWriteConcern wc))))) ;;; CRUD functions @@ -70,14 +70,42 @@ it (cond-> (if session (.aggregate (collection db coll opts) session ^List (map document pipeline)) (.aggregate (collection db coll opts) ^List (map document pipeline))) - (some? allow-disk-use?) (.allowDiskUse allow-disk-use?) - (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?) - batch-size (.batchSize batch-size))] + (some? allow-disk-use?) (.allowDiskUse allow-disk-use?) + (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?) + batch-size (.batchSize batch-size))] (if-not raw? (map (fn [x] (from-document x keywordize?)) (seq it)) it)))) +(defn bulk-write + "Executes a mix of inserts, updates, replaces, and deletes. + + - `db` is a MongoDatabase + - `coll` is a collection name + - `operations` a list of 2-tuples in the form `[op config]`, + - `op` is one of :insert-one :update-one :update-many :delete-one :delete-many :replace-one + - `config` the configuration map for the operation + - `insert` takes `:document` + - `update` takes `:filter`, `:update`, and any options in the corresponding update function + - `delete` takes `:filter`, and any options in the corresponding delete function + - `replace` takes `:filter`, `:replacement`, and any options in the corresponding replace function + - `opts` (optional), a map of: + - `:bypass-document-validation?` Boolean + - `:ordered?` Boolean whether serve should insert documents in order provided (default true) + - `:bulk-write-options` A BulkWriteOptions for configuring directly. If specified, + any other [preceding] query options will be applied to it. + - `:session` A ClientSession + + Additionally takes options specified in `collection`" + ([^MongoDatabase db coll operations] + (bulk-write db coll operations {})) + ([^MongoDatabase db coll operations opts] + (let [opts' (->BulkWriteOptions opts)] + (if-let [session (:session opts)] + (.bulkWrite (collection db coll opts') ^ClientSession session ^List (map write-model operations)) + (.bulkWrite (collection db coll opts') (map write-model operations)))))) + (defn count-documents "Count documents in a collection, optionally matching a filter query `q`. @@ -171,10 +199,10 @@ (let [it (cond-> (if session (.find (collection db coll opts) session (document q)) (.find (collection db coll opts) (document q))) - limit (.limit limit) - skip (.skip skip) - sort (.sort (document sort)) - projection (.projection (document projection)))] + limit (.limit limit) + skip (.skip skip) + sort (.sort (document sort)) + projection (.projection (document projection)))] (if-not raw? (map (fn [x] (from-document x keywordize?)) (seq it)) diff --git a/src/mongo_driver_3/model.clj b/src/mongo_driver_3/model.clj index 798304b..6b82c93 100644 --- a/src/mongo_driver_3/model.clj +++ b/src/mongo_driver_3/model.clj @@ -1,5 +1,5 @@ (ns mongo-driver-3.model - (:import (com.mongodb.client.model CountOptions DeleteOptions ReturnDocument FindOneAndUpdateOptions InsertOneOptions ReplaceOptions UpdateOptions CreateCollectionOptions RenameCollectionOptions InsertManyOptions FindOneAndReplaceOptions IndexOptions) + (:import (com.mongodb.client.model CountOptions DeleteOptions ReturnDocument FindOneAndUpdateOptions InsertOneOptions ReplaceOptions UpdateOptions CreateCollectionOptions RenameCollectionOptions InsertManyOptions FindOneAndReplaceOptions IndexOptions BulkWriteOptions DeleteManyModel DeleteOneModel InsertOneModel ReplaceOneModel UpdateManyModel UpdateOneModel) (org.bson Document) (java.util.concurrent TimeUnit) (com.mongodb WriteConcern ReadPreference ReadConcern) @@ -91,7 +91,7 @@ (if (instance? ReadConcern read-concern) read-concern (or (kw->ReadConcern read-concern) (throw (IllegalArgumentException. - (str "No match for read concern of " (name read-concern)))))))) + (str "No match for read concern of " (name read-concern)))))))) (defn ->ReadPreference "Coerce `rp` into a ReadPreference if not nil. See `collection` for usage." @@ -110,19 +110,27 @@ write-concern (WriteConcern/valueOf (name write-concern))))] (cond-> (or wc (WriteConcern/ACKNOWLEDGED)) - w (.withW w) - w-timeout-ms (.withWTimeout w-timeout-ms (TimeUnit/MILLISECONDS)) - (some? journal?) (.withJournal journal?))))) + w (.withW w) + w-timeout-ms (.withWTimeout w-timeout-ms (TimeUnit/MILLISECONDS)) + (some? journal?) (.withJournal journal?))))) + +(defn ^BulkWriteOptions ->BulkWriteOptions + "Coerce options map into BulkWriteOptions. See `bulk-write` for usage." + [{:keys [bulk-write-options bypass-document-validation? ordered?]}] + (let [^BulkWriteOptions opts (or bulk-write-options (BulkWriteOptions.))] + (cond-> opts + (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?) + (some? ordered?) (.ordered ordered?)))) (defn ^CountOptions ->CountOptions "Coerce options map into CountOptions. See `count-documents` for usage." [{:keys [count-options hint limit max-time-ms skip]}] (let [^CountOptions opts (or count-options (CountOptions.))] (cond-> opts - hint (.hint (document hint)) - limit (.limit limit) - max-time-ms (.maxTime max-time-ms (TimeUnit/MILLISECONDS)) - skip (.skip skip)))) + hint (.hint (document hint)) + limit (.limit limit) + max-time-ms (.maxTime max-time-ms (TimeUnit/MILLISECONDS)) + skip (.skip skip)))) (defn ^DeleteOptions ->DeleteOptions "Coerce options map into DeleteOptions. See `delete-one` and `delete-many` for usage." @@ -135,20 +143,20 @@ [{:keys [find-one-and-replace-options upsert? return-new? sort projection]}] (let [^FindOneAndReplaceOptions opts (or find-one-and-replace-options (FindOneAndReplaceOptions.))] (cond-> opts - (some? upsert?) (.upsert upsert?) - return-new? (.returnDocument (ReturnDocument/AFTER)) - sort (.sort (document sort)) - projection (.projection (document projection))))) + (some? upsert?) (.upsert upsert?) + return-new? (.returnDocument (ReturnDocument/AFTER)) + sort (.sort (document sort)) + projection (.projection (document projection))))) (defn ^FindOneAndUpdateOptions ->FindOneAndUpdateOptions "Coerce options map into FindOneAndUpdateOptions. See `find-one-and-update` for usage." [{:keys [find-one-and-update-options upsert? return-new? sort projection]}] (let [^FindOneAndUpdateOptions opts (or find-one-and-update-options (FindOneAndUpdateOptions.))] (cond-> opts - (some? upsert?) (.upsert upsert?) - return-new? (.returnDocument (ReturnDocument/AFTER)) - sort (.sort (document sort)) - projection (.projection (document projection))))) + (some? upsert?) (.upsert upsert?) + return-new? (.returnDocument (ReturnDocument/AFTER)) + sort (.sort (document sort)) + projection (.projection (document projection))))) (defn ^IndexOptions ->IndexOptions "Coerces an options map into an IndexOptions. @@ -157,55 +165,80 @@ [{:keys [index-options name sparse? unique?]}] (let [^IndexOptions opts (or index-options (IndexOptions.))] (cond-> opts - name (.name name) - (some? sparse?) (.sparse sparse?) - (some? unique?) (.unique unique?)))) + name (.name name) + (some? sparse?) (.sparse sparse?) + (some? unique?) (.unique unique?)))) (defn ^InsertManyOptions ->InsertManyOptions "Coerce options map into InsertManyOptions. See `insert-many` for usage." [{:keys [insert-many-options bypass-document-validation? ordered?]}] (let [^InsertManyOptions opts (or insert-many-options (InsertManyOptions.))] (cond-> opts - (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?) - (some? ordered?) (.ordered ordered?)))) + (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?) + (some? ordered?) (.ordered ordered?)))) (defn ^InsertOneOptions ->InsertOneOptions "Coerce options map into InsertOneOptions. See `insert-one` for usage." [{:keys [insert-one-options bypass-document-validation?]}] (let [^InsertOneOptions opts (or insert-one-options (InsertOneOptions.))] (cond-> opts - (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)))) + (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)))) (defn ^ReplaceOptions ->ReplaceOptions "Coerce options map into ReplaceOptions. See `replace-one` and `replace-many` for usage." [{:keys [replace-options upsert? bypass-document-validation?]}] (let [^ReplaceOptions opts (or replace-options (ReplaceOptions.))] (cond-> opts - (some? upsert?) (.upsert upsert?) - (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)))) + (some? upsert?) (.upsert upsert?) + (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)))) (defn ^UpdateOptions ->UpdateOptions "Coerce options map into UpdateOptions. See `update-one` and `update-many` for usage." [{:keys [update-options upsert? bypass-document-validation?]}] (let [^UpdateOptions opts (or update-options (UpdateOptions.))] (cond-> opts - (some? upsert?) (.upsert upsert?) - (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)))) + (some? upsert?) (.upsert upsert?) + (some? bypass-document-validation?) (.bypassDocumentValidation bypass-document-validation?)))) (defn ^CreateCollectionOptions ->CreateCollectionOptions "Coerce options map into CreateCollectionOptions. See `create` usage." [{:keys [create-collection-options capped? max-documents max-size-bytes]}] (let [^CreateCollectionOptions opts (or create-collection-options (CreateCollectionOptions.))] (cond-> opts - (some? capped?) (.capped capped?) - max-documents (.maxDocuments max-documents) - max-size-bytes (.sizeInBytes max-size-bytes)))) + (some? capped?) (.capped capped?) + max-documents (.maxDocuments max-documents) + max-size-bytes (.sizeInBytes max-size-bytes)))) (defn ^RenameCollectionOptions ->RenameCollectionOptions "Coerce options map into RenameCollectionOptions. See `rename` usage." [{:keys [rename-collection-options drop-target?]}] (let [^RenameCollectionOptions opts (or rename-collection-options (RenameCollectionOptions.))] (cond-> opts - (some? drop-target?) (.dropTarget drop-target?)))) + (some? drop-target?) (.dropTarget drop-target?)))) +(defmulti write-model + (fn [[type _]] type)) +(defmethod write-model :delete-many + [[_ opts]] + (DeleteManyModel. (document (:filter opts)) (->DeleteOptions opts))) + +(defmethod write-model :delete-one + [[_ opts]] + (DeleteOneModel. (document (:filter opts)) (->DeleteOptions opts))) + +(defmethod write-model :insert-one + [[_ opts]] + (InsertOneModel. (document (:document opts)))) + +(defmethod write-model :replace-one + [[_ opts]] + (ReplaceOneModel. (document (:filter opts)) (document (:replacement opts)) (->ReplaceOptions opts))) + +(defmethod write-model :update-many + [[_ opts]] + (UpdateManyModel. (document (:filter opts)) (document (:update opts)) (->UpdateOptions opts))) + +(defmethod write-model :update-one + [[_ opts]] + (UpdateOneModel. (document (:filter opts)) (document (:update opts)) (->UpdateOptions opts))) diff --git a/test/mongo_driver_3/collection_test.clj b/test/mongo_driver_3/collection_test.clj index ffa8932..7092f4b 100644 --- a/test/mongo_driver_3/collection_test.clj +++ b/test/mongo_driver_3/collection_test.clj @@ -232,6 +232,28 @@ (is (nil? (dissoc (mc/find-one-and-replace db "test" {:id 1} {:id 1 :v 2} {:return-new? true}) :_id))) (is (= {:id 1 :v 2} (dissoc (mc/find-one-and-replace db "test" {:id 1} {:id 1 :v 2} {:return-new? true :upsert? true}) :_id)))))) +(deftest ^:integration test-bulk-write + (testing "existing docs" + (let [db (new-db @client) + _ (mc/insert-many db "test" [{:id 1} {:id 2} {:id 3} {:id 4}]) + _ (mc/bulk-write db "test" [[:replace-one {:filter {:id 2} :replacement {:id 2.1}}] + [:update-many {:filter {:id 3} :update {:$set {:a "b"}}}] + [:update-one {:filter {:id 4} :update {:$set {:a "b"}}}]])] + + (is (= [{:id 1} {:id 2.1} {:id 3 :a "b"} {:id 4 :a "b"}] + (mc/find db "test" {} {:projection {:_id 0}}))))) + + (testing "upsert" + (let [db (new-db @client) + res (mc/bulk-write db "test" [[:insert-one {:document {:id 1}}] + [:replace-one {:filter {:id 2} :replacement {:id 2.1} :upsert? true}] + [:update-many {:filter {:id 3} :update {:$set {:a "b"}} :upsert? true}] + [:update-one {:filter {:id 4} :update {:$set {:a "b"}} :upsert? true}]])] + + (is (= 4 (mc/count-documents db "test"))) + (is (= 1 (.getInsertedCount res))) + (is (= 3 (count (.getUpserts res))))))) + (deftest ^:integration test-replace-one (testing "existing doc" (let [db (new-db @client) diff --git a/test/mongo_driver_3/model_test.clj b/test/mongo_driver_3/model_test.clj index 4c6653c..3222b99 100644 --- a/test/mongo_driver_3/model_test.clj +++ b/test/mongo_driver_3/model_test.clj @@ -3,7 +3,7 @@ [mongo-driver-3.model :as m]) (:import (com.mongodb ReadConcern ReadPreference WriteConcern) (java.util.concurrent TimeUnit) - (com.mongodb.client.model InsertOneOptions InsertManyOptions DeleteOptions FindOneAndUpdateOptions ReturnDocument FindOneAndReplaceOptions CountOptions UpdateOptions ReplaceOptions IndexOptions CreateCollectionOptions RenameCollectionOptions))) + (com.mongodb.client.model InsertOneOptions InsertManyOptions DeleteOptions FindOneAndUpdateOptions ReturnDocument FindOneAndReplaceOptions CountOptions UpdateOptions ReplaceOptions IndexOptions CreateCollectionOptions RenameCollectionOptions BulkWriteOptions DeleteManyModel DeleteOneModel InsertOneModel ReplaceOneModel UpdateManyModel UpdateOneModel))) ;;; Unit @@ -44,7 +44,8 @@ (= expected (.isUpsert (m/->ReplaceOptions {:upsert? arg}))) true true false false - false nil) (is (true? (.getBypassDocumentValidation (m/->ReplaceOptions {:bypass-document-validation? true})))) + false nil) + (is (true? (.getBypassDocumentValidation (m/->ReplaceOptions {:bypass-document-validation? true})))) (is (true? (.getBypassDocumentValidation (m/->ReplaceOptions {:replace-options (.bypassDocumentValidation (ReplaceOptions.) true)}))) "configure directly") @@ -85,7 +86,7 @@ {:insert-many-options (.bypassDocumentValidation (InsertManyOptions.) true)}))) "configure directly") (is (false? (.getBypassDocumentValidation (m/->InsertManyOptions - {:insert-one-options (.bypassDocumentValidation (InsertManyOptions.) true) + {:insert-many-options (.bypassDocumentValidation (InsertManyOptions.) true) :bypass-document-validation? false}))) "can override")) @@ -176,4 +177,59 @@ (is (= opts (m/->CreateCollectionOptions {:create-collection-options opts})) "configure directly") (is (= 5 (.getMaxDocuments (m/->CreateCollectionOptions {:create-collection-options opts})))) (is (= 7 (.getMaxDocuments (m/->CreateCollectionOptions {:create-collection-options opts :max-documents 7}))) - "can override"))) \ No newline at end of file + "can override"))) + +(deftest test->BulkWriteOptions + (is (instance? BulkWriteOptions (m/->BulkWriteOptions {}))) + (are [expected arg] + (= expected (.getBypassDocumentValidation (m/->BulkWriteOptions {:bypass-document-validation? arg}))) + true true + false false + nil nil) + (are [expected arg] + (= expected (.isOrdered (m/->BulkWriteOptions {:ordered? arg}))) + true true + false false + true nil) + (is (true? (.getBypassDocumentValidation (m/->BulkWriteOptions + {:bulk-write-options (.bypassDocumentValidation (BulkWriteOptions.) true)}))) + "configure directly") + (is (false? (.getBypassDocumentValidation (m/->BulkWriteOptions + {:bulk-write-options (.bypassDocumentValidation (BulkWriteOptions.) true) + :bypass-document-validation? false}))) + "can override")) + +(deftest test-write-model + (testing "delete many" + (is (instance? DeleteManyModel (m/write-model [:delete-many {:filter {:a "b"}}])))) + + (testing "delete one" + (is (instance? DeleteOneModel (m/write-model [:delete-one {:filter {:a "b"}}])))) + + (testing "insert one" + (is (instance? InsertOneModel (m/write-model [:insert-one {:document {:a "b"}}])))) + + (testing "replace one" + (is (instance? ReplaceOneModel (m/write-model [:replace-one {:filter {:a "b"} :replacement {:a "c"}}]))) + (are [expected arg] + (= expected (.isUpsert (.getOptions (m/write-model [:replace-one {:filter {:a "b"} :replacement {:a "c"} :upsert? arg}])))) + true true + false false + false nil)) + + (testing "update many" + (is (instance? UpdateManyModel (m/write-model [:update-many {:filter {:a "b"} :update {"$set" {:a "c"}}}]))) + (are [expected arg] + (= expected (.isUpsert (.getOptions (m/write-model [:update-many {:filter {:a "b"} :update {"$set" {:a "c"}} :upsert? arg}])))) + true true + false false + false nil)) + + (testing "update one" + (is (instance? UpdateOneModel (m/write-model [:update-one {:filter {:a "b"} :update {"$set" {:a "c"}}}]))) + (are [expected arg] + (= expected (.isUpsert (.getOptions (m/write-model [:update-one {:filter {:a "b"} :update {"$set" {:a "c"}} :upsert? arg}])))) + true true + false false + false nil))) +