From 136dea00b225bf9f6bd67263fdfd02a5ea414a8f Mon Sep 17 00:00:00 2001 From: Bartek Marcinowski Date: Wed, 24 Jun 2015 15:47:46 +0100 Subject: [PATCH 1/2] Add allow-disk-use and cursor options to aggregate Reimplement aggregate on top of the Java driver's DBCollection#aggregate in order to return a cursor for the results. This allows users to overcome the 16MB result size limit and specify the cursor batch size. The allowDiskUse can also be passed to Mongo, through the allow-disk-use key. The maxTime option enables setting a limit (in milliseconds) on the execution time of the query, through the max-time key. --- src/clojure/monger/collection.clj | 29 ++++++++++++++----- .../test/aggregation_framework_test.clj | 19 ++++++++++++ 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/src/clojure/monger/collection.clj b/src/clojure/monger/collection.clj index 05f7366..511ed59 100644 --- a/src/clojure/monger/collection.clj +++ b/src/clojure/monger/collection.clj @@ -24,9 +24,11 @@ * http://clojuremongodb.info/articles/deleting.html * http://clojuremongodb.info/articles/aggregation.html" (:refer-clojure :exclude [find remove count drop distinct empty? update]) - (:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern - DBCursor MapReduceCommand MapReduceCommand$OutputType] + (:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern + DBCursor MapReduceCommand MapReduceCommand$OutputType AggregationOutput + AggregationOptions AggregationOptions$OutputMode] [java.util List Map] + [java.util.concurrent TimeUnit] [clojure.lang IPersistentMap ISeq] org.bson.types.ObjectId) (:require [monger.core :as mc] @@ -490,15 +492,28 @@ ;; Aggregation ;; +(defn- build-aggregation-options [{:keys [^Boolean allow-disk-use cursor ^Long max-time]}] + (cond-> (AggregationOptions/builder) + allow-disk-use (.allowDiskUse allow-disk-use) + cursor (.outputMode AggregationOptions$OutputMode/CURSOR) + max-time (.maxTime max-time TimeUnit/MILLISECONDS) + (:batch-size cursor) (.batchSize (int (:batch-size cursor))) + true .build)) + (defn aggregate "Executes an aggregation query. MongoDB 2.2+ only. + Accepts the options :allow-disk-use and :cursor (a map with the :batch-size + key), as described in the MongoDB manual. Additionally, the :max-time option + is supported, for specifying a limit on the execution time of the query in + milliseconds. See http://docs.mongodb.org/manual/applications/aggregation/ to learn more." - [^DB db ^String coll stages] - (let [res (mc/command db {:aggregate coll :pipeline stages})] - ;; this is what DBCollection#distinct does. Turning a blind's eye! - (.throwOnError res) - (map #(from-db-object % true) (.get res "result")))) + [^DB db ^String coll stages & opts] + (let [coll (.getCollection db coll) + agg-opts (build-aggregation-options opts) + pipe (java.util.ArrayList. (to-db-object stages)) + res (.aggregate coll pipe agg-opts)] + (map #(from-db-object % true) (iterator-seq res)))) ;; diff --git a/test/monger/test/aggregation_framework_test.clj b/test/monger/test/aggregation_framework_test.clj index fe86948..481db8d 100644 --- a/test/monger/test/aggregation_framework_test.clj +++ b/test/monger/test/aggregation_framework_test.clj @@ -85,4 +85,23 @@ expected "IL"] (mc/insert-batch db coll batch) (let [result (:state (first (mc/aggregate db coll [{$group {:_id 1 :state {$last "$state"}}}])))] + (is (= expected result))))) + + (deftest test-cursor-aggregation + (let [batch [{:state "CA" :quantity 1 :price 199.00} + {:state "NY" :quantity 2 :price 199.00} + {:state "NY" :quantity 1 :price 299.00} + {:state "IL" :quantity 2 :price 11.50 } + {:state "CA" :quantity 2 :price 2.95 } + {:state "IL" :quantity 3 :price 5.50 }] + expected #{{:quantity 1 :state "CA"} + {:quantity 2 :state "NY"} + {:quantity 1 :state "NY"} + {:quantity 2 :state "IL"} + {:quantity 2 :state "CA"} + {:quantity 3 :state "IL"}}] + (mc/insert-batch db coll batch) + (is (= 6 (mc/count db coll))) + (let [result (set (map #(select-keys % [:state :quantity]) + (mc/aggregate db coll [{$project {:state 1 :quantity 1}}] :cursor {:batch-size 10})))] (is (= expected result)))))) From f0946acd75ec048f5ed94ba2b822b4943ef717fc Mon Sep 17 00:00:00 2001 From: Bartek Marcinowski Date: Wed, 24 Jun 2015 15:51:24 +0100 Subject: [PATCH 2/2] Enable query plan explanation for the aggregation framework Add the explain-aggregate function, which returns a map containing information about the given aggregation query. --- src/clojure/monger/collection.clj | 9 +++++++++ test/monger/test/aggregation_framework_test.clj | 12 +++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/clojure/monger/collection.clj b/src/clojure/monger/collection.clj index 511ed59..1845c3f 100644 --- a/src/clojure/monger/collection.clj +++ b/src/clojure/monger/collection.clj @@ -515,7 +515,16 @@ res (.aggregate coll pipe agg-opts)] (map #(from-db-object % true) (iterator-seq res)))) +(defn explain-aggregate + "Returns the explain plan for an aggregation query. MongoDB 2.2+ only. + See http://docs.mongodb.org/manual/applications/aggregation/ to learn more." + [^DB db ^String coll stages & opts] + (let [coll (.getCollection db coll) + agg-opts (build-aggregation-options opts) + pipe (java.util.ArrayList. (to-db-object stages)) + res (.explainAggregate coll pipe agg-opts)] + (from-db-object res true))) ;; ;; Misc ;; diff --git a/test/monger/test/aggregation_framework_test.clj b/test/monger/test/aggregation_framework_test.clj index 481db8d..b243168 100644 --- a/test/monger/test/aggregation_framework_test.clj +++ b/test/monger/test/aggregation_framework_test.clj @@ -104,4 +104,14 @@ (is (= 6 (mc/count db coll))) (let [result (set (map #(select-keys % [:state :quantity]) (mc/aggregate db coll [{$project {:state 1 :quantity 1}}] :cursor {:batch-size 10})))] - (is (= expected result)))))) + (is (= expected result))))) + + (deftest test-explain-aggregate + (let [batch [{:state "CA" :price 100} + {:state "CA" :price 10} + {:state "IL" :price 50}] + expected-keys #{:ok :stages}] + (mc/insert-batch db coll batch) + (let [result (mc/explain-aggregate db coll [{$match {:state "CA"}}]) + key-in-result? (partial contains? result)] + (is (every? key-in-result? expected-keys))))))