Avoid completing multiplex xforms multiple times
This commit is contained in:
parent
62375212a8
commit
04c577a063
2 changed files with 47 additions and 10 deletions
|
|
@ -657,7 +657,7 @@
|
||||||
(transduce (comp xform count) rf/last coll)))
|
(transduce (comp xform count) rf/last coll)))
|
||||||
|
|
||||||
(defn multiplex
|
(defn multiplex
|
||||||
"Returns a transducer that runs several transducers (sepcified by xforms) in parallel.
|
"Returns a transducer that runs several transducers (specified by xforms) in parallel.
|
||||||
If xforms is a map, values of the map are transducers and keys are used to tag each
|
If xforms is a map, values of the map are transducers and keys are used to tag each
|
||||||
transducer output:
|
transducer output:
|
||||||
=> (into [] (x/multiplex [(map inc) (map dec)]) (range 3))
|
=> (into [] (x/multiplex [(map inc) (map dec)]) (range 3))
|
||||||
|
|
@ -674,11 +674,11 @@
|
||||||
xforms)
|
xforms)
|
||||||
(into #{} (map #(% mrf)) xforms)))
|
(into #{} (map #(% mrf)) xforms)))
|
||||||
invoke-rfs (if (map? xforms)
|
invoke-rfs (if (map? xforms)
|
||||||
(fn [acc invoke]
|
(fn [acc step? invoke]
|
||||||
(reduce-kv
|
(reduce-kv
|
||||||
(fn [acc tag rf]
|
(fn [acc tag rf]
|
||||||
(let [acc (invoke rf acc)]
|
(let [acc (invoke rf acc)]
|
||||||
(if (reduced? acc)
|
(if (and step? (reduced? acc))
|
||||||
(if (reduced? @acc)
|
(if (reduced? @acc)
|
||||||
(do
|
(do
|
||||||
(vreset! rfs nil)
|
(vreset! rfs nil)
|
||||||
|
|
@ -686,11 +686,11 @@
|
||||||
(do (vswap! rfs dissoc tag) (rf @acc)))
|
(do (vswap! rfs dissoc tag) (rf @acc)))
|
||||||
acc)))
|
acc)))
|
||||||
acc @rfs))
|
acc @rfs))
|
||||||
(fn [acc invoke]
|
(fn [acc step? invoke]
|
||||||
(core/reduce
|
(core/reduce
|
||||||
(fn [acc rf]
|
(fn [acc rf]
|
||||||
(let [acc (invoke rf acc)]
|
(let [acc (invoke rf acc)]
|
||||||
(if (reduced? acc)
|
(if (and step? (reduced? acc))
|
||||||
(if (reduced? @acc)
|
(if (reduced? @acc)
|
||||||
(do
|
(do
|
||||||
(vreset! rfs nil)
|
(vreset! rfs nil)
|
||||||
|
|
@ -700,14 +700,14 @@
|
||||||
acc @rfs)))]
|
acc @rfs)))]
|
||||||
(kvrf
|
(kvrf
|
||||||
([] (rf))
|
([] (rf))
|
||||||
([acc] (rf (invoke-rfs acc #(%1 %2))))
|
([acc] (rf (invoke-rfs acc false #(%1 %2))))
|
||||||
([acc x]
|
([acc x]
|
||||||
(let [acc (invoke-rfs acc #(%1 %2 x))]
|
(let [acc (invoke-rfs acc true #(%1 %2 x))]
|
||||||
(if (zero? (core/count @rfs))
|
(if (zero? (core/count @rfs))
|
||||||
(ensure-reduced acc)
|
(ensure-reduced acc)
|
||||||
acc)))
|
acc)))
|
||||||
([acc k v]
|
([acc k v]
|
||||||
(let [acc (invoke-rfs acc #(%1 %2 k v))]
|
(let [acc (invoke-rfs acc true #(%1 %2 k v))]
|
||||||
(if (zero? (core/count @rfs))
|
(if (zero? (core/count @rfs))
|
||||||
(ensure-reduced acc)
|
(ensure-reduced acc)
|
||||||
acc)))))))
|
acc)))))))
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
(ns net.cgrand.xforms-test
|
(ns net.cgrand.xforms-test
|
||||||
(:require [clojure.test :refer [is deftest testing]]
|
(:require [clojure.test :refer [is deftest testing]]
|
||||||
[net.cgrand.xforms :as x]))
|
[net.cgrand.xforms :as x]
|
||||||
|
[net.cgrand.xforms.rfs :as rf]))
|
||||||
|
|
||||||
(defn trial
|
(defn trial
|
||||||
"A transducing context for testing that transducers are well-behaved towards
|
"A transducing context for testing that transducers are well-behaved towards
|
||||||
|
|
@ -139,4 +140,40 @@
|
||||||
(is (= (range 100) (x/into [] (x/sort) (shuffle (range 100)))))
|
(is (= (range 100) (x/into [] (x/sort) (shuffle (range 100)))))
|
||||||
(is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100)))))
|
(is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100)))))
|
||||||
(is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100)))))
|
(is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100)))))
|
||||||
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
|
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
|
||||||
|
|
||||||
|
(deftest multiplex
|
||||||
|
(testing "Respects reduced from multiplexed reductions and from the downstream reduction."
|
||||||
|
(testing "Respects multiplexed reduced."
|
||||||
|
(is (= [3]
|
||||||
|
(into []
|
||||||
|
(x/multiplex [(x/reduce rf/some)])
|
||||||
|
[3 5 2])))
|
||||||
|
(is (= {:x 3}
|
||||||
|
(into {}
|
||||||
|
(x/multiplex {:x (x/reduce rf/some)})
|
||||||
|
[3 5 2]))))
|
||||||
|
|
||||||
|
(testing "Respects downstream reduced."
|
||||||
|
(is (= 4
|
||||||
|
(transduce
|
||||||
|
(x/multiplex [(map inc)])
|
||||||
|
rf/some
|
||||||
|
[3 5 2])))
|
||||||
|
(is (= [:x 4]
|
||||||
|
(transduce
|
||||||
|
(x/multiplex {:x (map inc)})
|
||||||
|
rf/some
|
||||||
|
[3 5 2]))))
|
||||||
|
|
||||||
|
(testing "Doesn't repeat multiplexed completion."
|
||||||
|
(is (= 2
|
||||||
|
(transduce
|
||||||
|
(x/multiplex [(x/reduce rf/last)])
|
||||||
|
rf/some
|
||||||
|
[3 5 2])))
|
||||||
|
(is (= [:x 2]
|
||||||
|
(transduce
|
||||||
|
(x/multiplex {:x (x/reduce rf/last)})
|
||||||
|
rf/some
|
||||||
|
[3 5 2]))))))
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue