diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj index 95222bc..1fd432d 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.clj @@ -76,6 +76,11 @@ (defn- noprf "The noop reducing function" ([acc] acc) ([acc _] acc)) +(defn- multiplexable + "Creates a multiplexable reducing function (doesn't init or complete the uderlying rf)." + [rf] + (fn ([]) ([acc] acc) ([acc x] (rf acc x)))) ; no init no complete rf + (defn by-key "Returns a transducer which partitions items according to kfn. It applies the transform specified by xform to each partition. @@ -88,17 +93,17 @@ ([kfn vfn pair xform] (fn [rf] (let [make-rf (if pair - (fn [k] (fn ([acc] (rf acc)) ([acc v] (rf acc (pair k v))))) - (constantly rf)) + (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v))))) + (constantly (multiplexable rf))) m (volatile! (transient {}))] (fn self ([] (rf)) - ([acc] (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m)))) + ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) ([acc x] (let [k (kfn x) krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) acc (krf acc (vfn x))] - (when (reduced? acc) + (when (reduced? acc) ; complete? (vswap! m assoc! k noprf)) (unreduced acc)))))))) @@ -106,7 +111,7 @@ "Every n items, spawns a new pipeline." [n xform] (fn [rf] - (let [ncrf (fn ([]) ([acc] acc) ([acc x] (rf acc x))) ; no init no complete rf + (let [mxrf (multiplexable rf) vrfs (volatile! []) m (volatile! 0)] (fn @@ -126,8 +131,8 @@ (let [acc (clj/reduce step! acc rfs) acc (if (neg? (vswap! m dec)) (do - (step! acc (xform ncrf)) - (vswap! m + n)) + (vswap! m + n) + (step! acc (xform mxrf))) acc)] (vswap! vrfs persistent!) acc))))))) diff --git a/test/net/cgrand/xforms_test.clj b/test/net/cgrand/xforms_test.clj new file mode 100644 index 0000000..9e0b364 --- /dev/null +++ b/test/net/cgrand/xforms_test.clj @@ -0,0 +1,73 @@ +(ns net.cgrand.xforms-test + (:require [clojure.test :refer :all] + [net.cgrand.xforms :as x])) + +(defn trial + "A transducing context for testing that transducers are well-behaved towards + linear use of the accumulator, init, completion and handling of reduced values. + A \"poisonous\" reducing function rf is passed to the transducer. + n is the number of calls to rf before it returns a reduced. + accs is a collection of successive return values for rf." + ([xform n coll] + (trial xform n (repeatedly #(Object.)) coll)) + ([xform n accs coll] + (let [vaccs (volatile! accs) + vstate (volatile! {:n n :acc (first @vaccs) :state :init}) + check-acc (fn [acc] + (when (reduced? acc) + (throw (ex-info "Called with reduced accumulator" (assoc @vstate :actual-acc acc)))) + (when-not (identical? acc (:acc @vstate)) + (throw (ex-info "Called with an unexpected accumulator (either non-linear or out of thin air)" (assoc @vstate :actual-acc acc))))) + rf (fn + ([] + (when-not (= :init (:state @vstate)) + (throw (ex-info "Init arity called on a started or completed rf." @vstate))) + (:acc (vswap! vstate assoc :state :started))) + ([acc] + (when (= :completed (:state @vstate)) + (throw (ex-info "Completion arity called on an already completed rf." @vstate))) + (check-acc acc) + (:acc (vswap! vstate assoc :state :completed :acc (first (vswap! vaccs next))))) + ([acc x] + (when (= :completed (:state @vstate)) + (throw (ex-info "Step arity called on an already completed rf." @vstate))) + (when (= :reduced (:state @vstate)) + (throw (ex-info "Step arity called on a reduced rf." @vstate))) + (check-acc acc) + (let [n (:n @vstate)] + (let [acc (first (vswap! vaccs next))] + (if (pos? n) + (:acc (vswap! vstate assoc :acc acc :n (dec n))) + (reduced (:acc (vswap! vstate assoc :acc acc :status :reduced)))))))) + res (transduce xform rf coll)] + (check-acc res) + (when-not (= :completed (:state @vstate)) + (throw (ex-info "Completion arity never called" @vstate))) + true))) + +(deftest proper-rf-usage + (testing "Ensuring that reducing functions returned by transducers are well-behaved." + (is (trial (x/by-key odd? identity) + 4 (range 16))) + (is (trial (x/by-key odd? identity nil identity) + 4 (range 16))) + (is (trial (x/by-key odd? (take 2)) + 8 (range 16))) + (is (trial (x/by-key odd? identity) + 8 (range 2))) + (is (trial (x/partition 3 identity) + 4 (range 16))) + (is (trial (x/partition 3 (take 2)) + 8 (range 16))) + (is (trial (x/partition 3 (take 2)) + 8 (range 2))) + (is (trial (x/into []) + 4 (range 16))) + (is (trial (x/for [x % y (range x)] [x y]) + 4 (range 16))) + (is (trial (x/reduce +) + 4 (range 16))) + (is (trial (x/pad 2 (repeat :pad)) + 4 (range 16))) + (is (trial (x/pad 8 (repeat :pad)) + 4 (range 16)))))