This commit is contained in:
Timothy Jones 2019-10-23 23:35:44 +02:00 committed by GitHub
commit 0263381de5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 104 additions and 14 deletions

View file

@ -348,7 +348,10 @@
@acc) ; downstream is done, propagate
(do
(vswap! m assoc! k nop-rf)
(krf @acc))) ; TODO think again
(let [acc (krf @acc)]
(when (reduced? acc)
(vreset! m (transient {})))
acc)))
acc))))
(let [kfn (or kfn key')
vfn (or vfn val')]
@ -366,7 +369,10 @@
@acc) ; downstream is done, propagate
(do
(vswap! m assoc! k nop-rf)
(krf @acc)))
(let [acc (krf @acc)]
(when (reduced? acc)
(vreset! m (transient {})))
acc)))
acc)))))))))))
(defn into-by-key
@ -657,7 +663,7 @@
(transduce (comp xform count) rf/last coll)))
(defn multiplex
"Returns a transducer that runs several transducers (sepcified by xforms) in parallel.
"Returns a transducer that runs several transducers (specified by xforms) in parallel.
If xforms is a map, values of the map are transducers and keys are used to tag each
transducer output:
=> (into [] (x/multiplex [(map inc) (map dec)]) (range 3))
@ -674,40 +680,48 @@
xforms)
(into #{} (map #(% mrf)) xforms)))
invoke-rfs (if (map? xforms)
(fn [acc invoke]
(fn [acc step? invoke]
(reduce-kv
(fn [acc tag rf]
(let [acc (invoke rf acc)]
(if (reduced? acc)
(if (and step? (reduced? acc))
(if (reduced? @acc)
(do
(vreset! rfs nil)
acc) ; downstream is done, propagate
(do (vswap! rfs dissoc tag) (rf @acc)))
(do (vswap! rfs dissoc tag)
(let [acc (rf @acc)]
(when (reduced? acc)
(vreset! rfs nil))
acc)))
acc)))
acc @rfs))
(fn [acc invoke]
(fn [acc step? invoke]
(core/reduce
(fn [acc rf]
(let [acc (invoke rf acc)]
(if (reduced? acc)
(if (and step? (reduced? acc))
(if (reduced? @acc)
(do
(vreset! rfs nil)
acc) ; downstream is done, propagate
(do (vswap! rfs disj rf) (rf @acc)))
(do (vswap! rfs disj rf)
(let [acc (rf @acc)]
(when (reduced? acc)
(vreset! rfs nil))
acc)))
acc)))
acc @rfs)))]
(kvrf
([] (rf))
([acc] (rf (invoke-rfs acc #(%1 %2))))
([acc] (rf (invoke-rfs acc false #(%1 %2))))
([acc x]
(let [acc (invoke-rfs acc #(%1 %2 x))]
(let [acc (invoke-rfs acc true #(%1 %2 x))]
(if (zero? (core/count @rfs))
(ensure-reduced acc)
acc)))
([acc k v]
(let [acc (invoke-rfs acc #(%1 %2 k v))]
(let [acc (invoke-rfs acc true #(%1 %2 k v))]
(if (zero? (core/count @rfs))
(ensure-reduced acc)
acc)))))))

View file

@ -1,6 +1,7 @@
(ns net.cgrand.xforms-test
(:require [clojure.test :refer [is deftest testing]]
[net.cgrand.xforms :as x]))
[net.cgrand.xforms :as x]
[net.cgrand.xforms.rfs :as rf]))
(defn trial
"A transducing context for testing that transducers are well-behaved towards
@ -139,4 +140,79 @@
(is (= (range 100) (x/into [] (x/sort) (shuffle (range 100)))))
(is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100)))))
(is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100)))))
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
(deftest by-key
(testing "Respects reduced from multiplexed reductions and from the downstream reduction."
(is (= {:x 3 :y 5}
(into {}
(x/by-key (x/reduce rf/some))
[[:x 3] [:y 5] [:x 2]]))
"Respects multiplexed reduced.")
(is (= [:x 4]
(transduce
(x/by-key (map inc))
rf/some
[[:x 3] [:y 5] [:x 2]]))
"Respects downstream reduced.")
(is (= [:y 4]
(transduce
(x/by-key (x/reduce (fn
([] 0)
([sum] sum)
([sum x]
(let [sum (+ sum x)]
(if (> sum 4) (reduced 4) sum))))))
rf/some
[[:x 3] [:y 5] [:x 2]]))
"Respects reduced in downstream complete.")))
(deftest multiplex
(testing "Respects reduced from multiplexed reductions and from the downstream reduction."
(testing "Respects multiplexed reduced."
(is (= [3]
(into []
(x/multiplex [(x/reduce rf/some)])
[3 5 2])))
(is (= {:x 3}
(into {}
(x/multiplex {:x (x/reduce rf/some)})
[3 5 2]))))
(testing "Respects downstream reduced."
(is (= 4
(transduce
(x/multiplex [(map inc)])
rf/some
[3 5 2])))
(is (= [:x 4]
(transduce
(x/multiplex {:x (map inc)})
rf/some
[3 5 2]))))
(testing "Doesn't repeat multiplexed completion."
(is (= 2
(transduce
(x/multiplex [(x/reduce rf/last)])
rf/some
[3 5 2])))
(is (= [:x 2]
(transduce
(x/multiplex {:x (x/reduce rf/last)})
rf/some
[3 5 2]))))
(testing "Respects reduced in downstream complete."
(is (= 3
(transduce
(x/multiplex [(x/reduce rf/some) (x/reduce rf/some)])
rf/some
[3 5 2])))
(is (= 3
(transduce
(x/multiplex {:x (x/reduce rf/some) :y (x/reduce rf/some)})
(completing rf/some second)
[3 5 2]))))))