diff --git a/src/clojure/monger/collection.clj b/src/clojure/monger/collection.clj index 05f7366..511ed59 100644 --- a/src/clojure/monger/collection.clj +++ b/src/clojure/monger/collection.clj @@ -24,9 +24,11 @@ * http://clojuremongodb.info/articles/deleting.html * http://clojuremongodb.info/articles/aggregation.html" (:refer-clojure :exclude [find remove count drop distinct empty? update]) - (:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern - DBCursor MapReduceCommand MapReduceCommand$OutputType] + (:import [com.mongodb Mongo DB DBCollection WriteResult DBObject WriteConcern + DBCursor MapReduceCommand MapReduceCommand$OutputType AggregationOutput + AggregationOptions AggregationOptions$OutputMode] [java.util List Map] + [java.util.concurrent TimeUnit] [clojure.lang IPersistentMap ISeq] org.bson.types.ObjectId) (:require [monger.core :as mc] @@ -490,15 +492,28 @@ ;; Aggregation ;; +(defn- build-aggregation-options [{:keys [^Boolean allow-disk-use cursor ^Long max-time]}] + (cond-> (AggregationOptions/builder) + allow-disk-use (.allowDiskUse allow-disk-use) + cursor (.outputMode AggregationOptions$OutputMode/CURSOR) + max-time (.maxTime max-time TimeUnit/MILLISECONDS) + (:batch-size cursor) (.batchSize (int (:batch-size cursor))) + true .build)) + (defn aggregate "Executes an aggregation query. MongoDB 2.2+ only. + Accepts the options :allow-disk-use and :cursor (a map with the :batch-size + key), as described in the MongoDB manual. Additionally, the :max-time option + is supported, for specifying a limit on the execution time of the query in + milliseconds. See http://docs.mongodb.org/manual/applications/aggregation/ to learn more." - [^DB db ^String coll stages] - (let [res (mc/command db {:aggregate coll :pipeline stages})] - ;; this is what DBCollection#distinct does. Turning a blind's eye! - (.throwOnError res) - (map #(from-db-object % true) (.get res "result")))) + [^DB db ^String coll stages & opts] + (let [coll (.getCollection db coll) + agg-opts (build-aggregation-options opts) + pipe (java.util.ArrayList. (to-db-object stages)) + res (.aggregate coll pipe agg-opts)] + (map #(from-db-object % true) (iterator-seq res)))) ;; diff --git a/test/monger/test/aggregation_framework_test.clj b/test/monger/test/aggregation_framework_test.clj index fe86948..481db8d 100644 --- a/test/monger/test/aggregation_framework_test.clj +++ b/test/monger/test/aggregation_framework_test.clj @@ -85,4 +85,23 @@ expected "IL"] (mc/insert-batch db coll batch) (let [result (:state (first (mc/aggregate db coll [{$group {:_id 1 :state {$last "$state"}}}])))] + (is (= expected result))))) + + (deftest test-cursor-aggregation + (let [batch [{:state "CA" :quantity 1 :price 199.00} + {:state "NY" :quantity 2 :price 199.00} + {:state "NY" :quantity 1 :price 299.00} + {:state "IL" :quantity 2 :price 11.50 } + {:state "CA" :quantity 2 :price 2.95 } + {:state "IL" :quantity 3 :price 5.50 }] + expected #{{:quantity 1 :state "CA"} + {:quantity 2 :state "NY"} + {:quantity 1 :state "NY"} + {:quantity 2 :state "IL"} + {:quantity 2 :state "CA"} + {:quantity 3 :state "IL"}}] + (mc/insert-batch db coll batch) + (is (= 6 (mc/count db coll))) + (let [result (set (map #(select-keys % [:state :quantity]) + (mc/aggregate db coll [{$project {:state 1 :quantity 1}}] :cursor {:batch-size 10})))] (is (= expected result))))))