Add allow-disk-use and cursor options to aggregate
Reimplement aggregate on top of the Java driver's DBCollection#aggregate in order to return a cursor for the results. This allows users to overcome the 16MB result size limit and specify the cursor batch size. The allowDiskUse can also be passed to Mongo, through the allow-disk-use key. The maxTime option enables setting a limit (in milliseconds) on the execution time of the query, through the max-time key.
This commit is contained in:
parent
9cb54167c4
commit
136dea00b2
2 changed files with 41 additions and 7 deletions
|
|
@ -24,9 +24,11 @@
|
||||||
* http://clojuremongodb.info/articles/deleting.html
|
* http://clojuremongodb.info/articles/deleting.html
|
||||||
* http://clojuremongodb.info/articles/aggregation.html"
|
* http://clojuremongodb.info/articles/aggregation.html"
|
||||||
(:refer-clojure :exclude [find remove count drop distinct empty? update])
|
(:refer-clojure :exclude [find remove count drop distinct empty? update])
|
||||||
(:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern
|
(:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern
|
||||||
DBCursor MapReduceCommand MapReduceCommand$OutputType]
|
DBCursor MapReduceCommand MapReduceCommand$OutputType AggregationOutput
|
||||||
|
AggregationOptions AggregationOptions$OutputMode]
|
||||||
[java.util List Map]
|
[java.util List Map]
|
||||||
|
[java.util.concurrent TimeUnit]
|
||||||
[clojure.lang IPersistentMap ISeq]
|
[clojure.lang IPersistentMap ISeq]
|
||||||
org.bson.types.ObjectId)
|
org.bson.types.ObjectId)
|
||||||
(:require [monger.core :as mc]
|
(:require [monger.core :as mc]
|
||||||
|
|
@ -490,15 +492,28 @@
|
||||||
;; Aggregation
|
;; Aggregation
|
||||||
;;
|
;;
|
||||||
|
|
||||||
|
(defn- build-aggregation-options [{:keys [^Boolean allow-disk-use cursor ^Long max-time]}]
|
||||||
|
(cond-> (AggregationOptions/builder)
|
||||||
|
allow-disk-use (.allowDiskUse allow-disk-use)
|
||||||
|
cursor (.outputMode AggregationOptions$OutputMode/CURSOR)
|
||||||
|
max-time (.maxTime max-time TimeUnit/MILLISECONDS)
|
||||||
|
(:batch-size cursor) (.batchSize (int (:batch-size cursor)))
|
||||||
|
true .build))
|
||||||
|
|
||||||
(defn aggregate
|
(defn aggregate
|
||||||
"Executes an aggregation query. MongoDB 2.2+ only.
|
"Executes an aggregation query. MongoDB 2.2+ only.
|
||||||
|
Accepts the options :allow-disk-use and :cursor (a map with the :batch-size
|
||||||
|
key), as described in the MongoDB manual. Additionally, the :max-time option
|
||||||
|
is supported, for specifying a limit on the execution time of the query in
|
||||||
|
milliseconds.
|
||||||
|
|
||||||
See http://docs.mongodb.org/manual/applications/aggregation/ to learn more."
|
See http://docs.mongodb.org/manual/applications/aggregation/ to learn more."
|
||||||
[^DB db ^String coll stages]
|
[^DB db ^String coll stages & opts]
|
||||||
(let [res (mc/command db {:aggregate coll :pipeline stages})]
|
(let [coll (.getCollection db coll)
|
||||||
;; this is what DBCollection#distinct does. Turning a blind's eye!
|
agg-opts (build-aggregation-options opts)
|
||||||
(.throwOnError res)
|
pipe (java.util.ArrayList. (to-db-object stages))
|
||||||
(map #(from-db-object % true) (.get res "result"))))
|
res (.aggregate coll pipe agg-opts)]
|
||||||
|
(map #(from-db-object % true) (iterator-seq res))))
|
||||||
|
|
||||||
|
|
||||||
;;
|
;;
|
||||||
|
|
|
||||||
|
|
@ -85,4 +85,23 @@
|
||||||
expected "IL"]
|
expected "IL"]
|
||||||
(mc/insert-batch db coll batch)
|
(mc/insert-batch db coll batch)
|
||||||
(let [result (:state (first (mc/aggregate db coll [{$group {:_id 1 :state {$last "$state"}}}])))]
|
(let [result (:state (first (mc/aggregate db coll [{$group {:_id 1 :state {$last "$state"}}}])))]
|
||||||
|
(is (= expected result)))))
|
||||||
|
|
||||||
|
(deftest test-cursor-aggregation
|
||||||
|
(let [batch [{:state "CA" :quantity 1 :price 199.00}
|
||||||
|
{:state "NY" :quantity 2 :price 199.00}
|
||||||
|
{:state "NY" :quantity 1 :price 299.00}
|
||||||
|
{:state "IL" :quantity 2 :price 11.50 }
|
||||||
|
{:state "CA" :quantity 2 :price 2.95 }
|
||||||
|
{:state "IL" :quantity 3 :price 5.50 }]
|
||||||
|
expected #{{:quantity 1 :state "CA"}
|
||||||
|
{:quantity 2 :state "NY"}
|
||||||
|
{:quantity 1 :state "NY"}
|
||||||
|
{:quantity 2 :state "IL"}
|
||||||
|
{:quantity 2 :state "CA"}
|
||||||
|
{:quantity 3 :state "IL"}}]
|
||||||
|
(mc/insert-batch db coll batch)
|
||||||
|
(is (= 6 (mc/count db coll)))
|
||||||
|
(let [result (set (map #(select-keys % [:state :quantity])
|
||||||
|
(mc/aggregate db coll [{$project {:state 1 :quantity 1}}] :cursor {:batch-size 10})))]
|
||||||
(is (= expected result))))))
|
(is (= expected result))))))
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue