Handle downstream reduced in multiplexed complete

This commit is contained in:
Timothy Jones 2019-10-23 16:46:36 -04:00
parent 04c577a063
commit 1a3cbdac72
No known key found for this signature in database
GPG key ID: FE38C3CC28B5A364
2 changed files with 57 additions and 4 deletions

View file

@ -348,7 +348,10 @@
@acc) ; downstream is done, propagate @acc) ; downstream is done, propagate
(do (do
(vswap! m assoc! k nop-rf) (vswap! m assoc! k nop-rf)
(krf @acc))) ; TODO think again (let [acc (krf @acc)]
(when (reduced? acc)
(vreset! m (transient {})))
acc)))
acc)))) acc))))
(let [kfn (or kfn key') (let [kfn (or kfn key')
vfn (or vfn val')] vfn (or vfn val')]
@ -366,7 +369,10 @@
@acc) ; downstream is done, propagate @acc) ; downstream is done, propagate
(do (do
(vswap! m assoc! k nop-rf) (vswap! m assoc! k nop-rf)
(krf @acc))) (let [acc (krf @acc)]
(when (reduced? acc)
(vreset! m (transient {})))
acc)))
acc))))))))))) acc)))))))))))
(defn into-by-key (defn into-by-key
@ -683,7 +689,11 @@
(do (do
(vreset! rfs nil) (vreset! rfs nil)
acc) ; downstream is done, propagate acc) ; downstream is done, propagate
(do (vswap! rfs dissoc tag) (rf @acc))) (do (vswap! rfs dissoc tag)
(let [acc (rf @acc)]
(when (reduced? acc)
(vreset! rfs nil))
acc)))
acc))) acc)))
acc @rfs)) acc @rfs))
(fn [acc step? invoke] (fn [acc step? invoke]
@ -695,7 +705,11 @@
(do (do
(vreset! rfs nil) (vreset! rfs nil)
acc) ; downstream is done, propagate acc) ; downstream is done, propagate
(do (vswap! rfs disj rf) (rf @acc))) (do (vswap! rfs disj rf)
(let [acc (rf @acc)]
(when (reduced? acc)
(vreset! rfs nil))
acc)))
acc))) acc)))
acc @rfs)))] acc @rfs)))]
(kvrf (kvrf

View file

@ -142,6 +142,33 @@
(is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100))))) (is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100)))))
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100)))))) (is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
(deftest by-key
(testing "Respects reduced from multiplexed reductions and from the downstream reduction."
(is (= {:x 3 :y 5}
(into {}
(x/by-key (x/reduce rf/some))
[[:x 3] [:y 5] [:x 2]]))
"Respects multiplexed reduced.")
(is (= [:x 4]
(transduce
(x/by-key (map inc))
rf/some
[[:x 3] [:y 5] [:x 2]]))
"Respects downstream reduced.")
(is (= [:y 4]
(transduce
(x/by-key (x/reduce (fn
([] 0)
([sum] sum)
([sum x]
(let [sum (+ sum x)]
(if (> sum 4) (reduced 4) sum))))))
rf/some
[[:x 3] [:y 5] [:x 2]]))
"Respects reduced in downstream complete.")))
(deftest multiplex (deftest multiplex
(testing "Respects reduced from multiplexed reductions and from the downstream reduction." (testing "Respects reduced from multiplexed reductions and from the downstream reduction."
(testing "Respects multiplexed reduced." (testing "Respects multiplexed reduced."
@ -176,4 +203,16 @@
(transduce (transduce
(x/multiplex {:x (x/reduce rf/last)}) (x/multiplex {:x (x/reduce rf/last)})
rf/some rf/some
[3 5 2]))))
(testing "Respects reduced in downstream complete."
(is (= 3
(transduce
(x/multiplex [(x/reduce rf/some) (x/reduce rf/some)])
rf/some
[3 5 2])))
(is (= 3
(transduce
(x/multiplex {:x (x/reduce rf/some) :y (x/reduce rf/some)})
(completing rf/some second)
[3 5 2])))))) [3 5 2]))))))