Merge branch 'add_options_to_aggregate'

Merge implementation of aggregation framework options and explanation of
aggregation query plan.

Conflicts:
	src/clojure/monger/collection.clj
This commit is contained in:
Bartek Marcinowski 2015-06-24 16:05:47 +01:00
commit 15f50408a9
2 changed files with 61 additions and 10 deletions

View file

@ -51,9 +51,11 @@
* http://clojuremongodb.info/articles/deleting.html
* http://clojuremongodb.info/articles/aggregation.html"
(:refer-clojure :exclude [find remove count drop distinct empty? update])
(:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern
DBCursor MapReduceCommand MapReduceCommand$OutputType]
(:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern
DBCursor MapReduceCommand MapReduceCommand$OutputType AggregationOutput
AggregationOptions AggregationOptions$OutputMode]
[java.util List Map]
[java.util.concurrent TimeUnit]
[clojure.lang IPersistentMap ISeq]
org.bson.types.ObjectId)
(:require [monger.core :as mc]
@ -519,19 +521,39 @@
;; Aggregation
;;
(defn- build-aggregation-options [{:keys [^Boolean allow-disk-use cursor ^Long max-time]}]
(cond-> (AggregationOptions/builder)
allow-disk-use (.allowDiskUse allow-disk-use)
cursor (.outputMode AggregationOptions$OutputMode/CURSOR)
max-time (.maxTime max-time TimeUnit/MILLISECONDS)
(:batch-size cursor) (.batchSize (int (:batch-size cursor)))
true .build))
(defn aggregate
"Executes an aggregation query. MongoDB 2.2+ only.
Accepts the options :allow-disk-use and :cursor (a map with the :batch-size
key), as described in the MongoDB manual. Additionally, the :max-time option
is supported, for specifying a limit on the execution time of the query in
milliseconds.
See http://docs.mongodb.org/manual/applications/aggregation/ to learn more."
([^DB db ^String coll stages]
(aggregate db coll stages {}))
([^DB db ^String coll stages opts]
(let [res (mc/command db (merge {:aggregate coll :pipeline stages} opts))]
;; this is what DBCollection#distinct does. Turning a blind's eye!
(.throwOnError res)
(map #(from-db-object % true) (.get res "result")))))
[^DB db ^String coll stages & opts]
(let [coll (.getCollection db coll)
agg-opts (build-aggregation-options opts)
pipe (java.util.ArrayList. (to-db-object stages))
res (.aggregate coll pipe agg-opts)]
(map #(from-db-object % true) (iterator-seq res))))
(defn explain-aggregate
"Returns the explain plan for an aggregation query. MongoDB 2.2+ only.
See http://docs.mongodb.org/manual/applications/aggregation/ to learn more."
[^DB db ^String coll stages & opts]
(let [coll (.getCollection db coll)
agg-opts (build-aggregation-options opts)
pipe (java.util.ArrayList. (to-db-object stages))
res (.explainAggregate coll pipe agg-opts)]
(from-db-object res true)))
;;
;; Misc
;;

View file

@ -85,4 +85,33 @@
expected "IL"]
(mc/insert-batch db coll batch)
(let [result (:state (first (mc/aggregate db coll [{$group {:_id 1 :state {$last "$state"}}}])))]
(is (= expected result))))))
(is (= expected result)))))
(deftest test-cursor-aggregation
(let [batch [{:state "CA" :quantity 1 :price 199.00}
{:state "NY" :quantity 2 :price 199.00}
{:state "NY" :quantity 1 :price 299.00}
{:state "IL" :quantity 2 :price 11.50 }
{:state "CA" :quantity 2 :price 2.95 }
{:state "IL" :quantity 3 :price 5.50 }]
expected #{{:quantity 1 :state "CA"}
{:quantity 2 :state "NY"}
{:quantity 1 :state "NY"}
{:quantity 2 :state "IL"}
{:quantity 2 :state "CA"}
{:quantity 3 :state "IL"}}]
(mc/insert-batch db coll batch)
(is (= 6 (mc/count db coll)))
(let [result (set (map #(select-keys % [:state :quantity])
(mc/aggregate db coll [{$project {:state 1 :quantity 1}}] :cursor {:batch-size 10})))]
(is (= expected result)))))
(deftest test-explain-aggregate
(let [batch [{:state "CA" :price 100}
{:state "CA" :price 10}
{:state "IL" :price 50}]
expected-keys #{:ok :stages}]
(mc/insert-batch db coll batch)
(let [result (mc/explain-aggregate db coll [{$match {:state "CA"}}])
key-in-result? (partial contains? result)]
(is (every? key-in-result? expected-keys))))))