diff --git a/src/net/cgrand/xforms.cljc b/src/net/cgrand/xforms.cljc index 3740693..8285c22 100644 --- a/src/net/cgrand/xforms.cljc +++ b/src/net/cgrand/xforms.cljc @@ -657,7 +657,7 @@ (transduce (comp xform count) rf/last coll))) (defn multiplex - "Returns a transducer that runs several transducers (sepcified by xforms) in parallel. + "Returns a transducer that runs several transducers (specified by xforms) in parallel. If xforms is a map, values of the map are transducers and keys are used to tag each transducer output: => (into [] (x/multiplex [(map inc) (map dec)]) (range 3)) @@ -674,11 +674,11 @@ xforms) (into #{} (map #(% mrf)) xforms))) invoke-rfs (if (map? xforms) - (fn [acc invoke] + (fn [acc step? invoke] (reduce-kv (fn [acc tag rf] (let [acc (invoke rf acc)] - (if (reduced? acc) + (if (and step? (reduced? acc)) (if (reduced? @acc) (do (vreset! rfs nil) @@ -686,11 +686,11 @@ (do (vswap! rfs dissoc tag) (rf @acc))) acc))) acc @rfs)) - (fn [acc invoke] + (fn [acc step? invoke] (core/reduce (fn [acc rf] (let [acc (invoke rf acc)] - (if (reduced? acc) + (if (and step? (reduced? acc)) (if (reduced? @acc) (do (vreset! rfs nil) @@ -700,14 +700,14 @@ acc @rfs)))] (kvrf ([] (rf)) - ([acc] (rf (invoke-rfs acc #(%1 %2)))) + ([acc] (rf (invoke-rfs acc false #(%1 %2)))) ([acc x] - (let [acc (invoke-rfs acc #(%1 %2 x))] + (let [acc (invoke-rfs acc true #(%1 %2 x))] (if (zero? (core/count @rfs)) (ensure-reduced acc) acc))) ([acc k v] - (let [acc (invoke-rfs acc #(%1 %2 k v))] + (let [acc (invoke-rfs acc true #(%1 %2 k v))] (if (zero? (core/count @rfs)) (ensure-reduced acc) acc))))))) diff --git a/test/net/cgrand/xforms_test.cljc b/test/net/cgrand/xforms_test.cljc index d3b0466..1824721 100644 --- a/test/net/cgrand/xforms_test.cljc +++ b/test/net/cgrand/xforms_test.cljc @@ -1,6 +1,7 @@ (ns net.cgrand.xforms-test (:require [clojure.test :refer [is deftest testing]] - [net.cgrand.xforms :as x])) + [net.cgrand.xforms :as x] + [net.cgrand.xforms.rfs :as rf])) (defn trial "A transducing context for testing that transducers are well-behaved towards @@ -139,4 +140,40 @@ (is (= (range 100) (x/into [] (x/sort) (shuffle (range 100))))) (is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100))))) (is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100))))) - (is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100)))))) \ No newline at end of file + (is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100)))))) + +(deftest multiplex + (testing "Respects reduced from multiplexed reductions and from the downstream reduction." + (testing "Respects multiplexed reduced." + (is (= [3] + (into [] + (x/multiplex [(x/reduce rf/some)]) + [3 5 2]))) + (is (= {:x 3} + (into {} + (x/multiplex {:x (x/reduce rf/some)}) + [3 5 2])))) + + (testing "Respects downstream reduced." + (is (= 4 + (transduce + (x/multiplex [(map inc)]) + rf/some + [3 5 2]))) + (is (= [:x 4] + (transduce + (x/multiplex {:x (map inc)}) + rf/some + [3 5 2])))) + + (testing "Doesn't repeat multiplexed completion." + (is (= 2 + (transduce + (x/multiplex [(x/reduce rf/last)]) + rf/some + [3 5 2]))) + (is (= [:x 2] + (transduce + (x/multiplex {:x (x/reduce rf/last)}) + rf/some + [3 5 2]))))))