diff --git a/src/clojure/monger/collection.clj b/src/clojure/monger/collection.clj index 59ff4b0..6e76dee 100644 --- a/src/clojure/monger/collection.clj +++ b/src/clojure/monger/collection.clj @@ -51,9 +51,11 @@ * http://clojuremongodb.info/articles/deleting.html * http://clojuremongodb.info/articles/aggregation.html" (:refer-clojure :exclude [find remove count drop distinct empty? update]) - (:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern - DBCursor MapReduceCommand MapReduceCommand$OutputType] + (:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern + DBCursor MapReduceCommand MapReduceCommand$OutputType AggregationOutput + AggregationOptions AggregationOptions$OutputMode] [java.util List Map] + [java.util.concurrent TimeUnit] [clojure.lang IPersistentMap ISeq] org.bson.types.ObjectId) (:require [monger.core :as mc] @@ -519,19 +521,39 @@ ;; Aggregation ;; +(defn- build-aggregation-options [{:keys [^Boolean allow-disk-use cursor ^Long max-time]}] + (cond-> (AggregationOptions/builder) + allow-disk-use (.allowDiskUse allow-disk-use) + cursor (.outputMode AggregationOptions$OutputMode/CURSOR) + max-time (.maxTime max-time TimeUnit/MILLISECONDS) + (:batch-size cursor) (.batchSize (int (:batch-size cursor))) + true .build)) + (defn aggregate "Executes an aggregation query. MongoDB 2.2+ only. + Accepts the options :allow-disk-use and :cursor (a map with the :batch-size + key), as described in the MongoDB manual. Additionally, the :max-time option + is supported, for specifying a limit on the execution time of the query in + milliseconds. See http://docs.mongodb.org/manual/applications/aggregation/ to learn more." - ([^DB db ^String coll stages] - (aggregate db coll stages {})) - ([^DB db ^String coll stages opts] - (let [res (mc/command db (merge {:aggregate coll :pipeline stages} opts))] - ;; this is what DBCollection#distinct does. Turning a blind's eye! - (.throwOnError res) - (map #(from-db-object % true) (.get res "result"))))) + [^DB db ^String coll stages & opts] + (let [coll (.getCollection db coll) + agg-opts (build-aggregation-options opts) + pipe (java.util.ArrayList. (to-db-object stages)) + res (.aggregate coll pipe agg-opts)] + (map #(from-db-object % true) (iterator-seq res)))) +(defn explain-aggregate + "Returns the explain plan for an aggregation query. MongoDB 2.2+ only. + See http://docs.mongodb.org/manual/applications/aggregation/ to learn more." + [^DB db ^String coll stages & opts] + (let [coll (.getCollection db coll) + agg-opts (build-aggregation-options opts) + pipe (java.util.ArrayList. (to-db-object stages)) + res (.explainAggregate coll pipe agg-opts)] + (from-db-object res true))) ;; ;; Misc ;; diff --git a/test/monger/test/aggregation_framework_test.clj b/test/monger/test/aggregation_framework_test.clj index fe86948..b243168 100644 --- a/test/monger/test/aggregation_framework_test.clj +++ b/test/monger/test/aggregation_framework_test.clj @@ -85,4 +85,33 @@ expected "IL"] (mc/insert-batch db coll batch) (let [result (:state (first (mc/aggregate db coll [{$group {:_id 1 :state {$last "$state"}}}])))] - (is (= expected result)))))) + (is (= expected result))))) + + (deftest test-cursor-aggregation + (let [batch [{:state "CA" :quantity 1 :price 199.00} + {:state "NY" :quantity 2 :price 199.00} + {:state "NY" :quantity 1 :price 299.00} + {:state "IL" :quantity 2 :price 11.50 } + {:state "CA" :quantity 2 :price 2.95 } + {:state "IL" :quantity 3 :price 5.50 }] + expected #{{:quantity 1 :state "CA"} + {:quantity 2 :state "NY"} + {:quantity 1 :state "NY"} + {:quantity 2 :state "IL"} + {:quantity 2 :state "CA"} + {:quantity 3 :state "IL"}}] + (mc/insert-batch db coll batch) + (is (= 6 (mc/count db coll))) + (let [result (set (map #(select-keys % [:state :quantity]) + (mc/aggregate db coll [{$project {:state 1 :quantity 1}}] :cursor {:batch-size 10})))] + (is (= expected result))))) + + (deftest test-explain-aggregate + (let [batch [{:state "CA" :price 100} + {:state "CA" :price 10} + {:state "IL" :price 50}] + expected-keys #{:ok :stages}] + (mc/insert-batch db coll batch) + (let [result (mc/explain-aggregate db coll [{$match {:state "CA"}}]) + key-in-result? (partial contains? result)] + (is (every? key-in-result? expected-keys))))))