diff --git a/src/net/cgrand/xforms.cljc b/src/net/cgrand/xforms.cljc index 8285c22..7cb0f8d 100644 --- a/src/net/cgrand/xforms.cljc +++ b/src/net/cgrand/xforms.cljc @@ -348,7 +348,10 @@ @acc) ; downstream is done, propagate (do (vswap! m assoc! k nop-rf) - (krf @acc))) ; TODO think again + (let [acc (krf @acc)] + (when (reduced? acc) + (vreset! m (transient {}))) + acc))) acc)))) (let [kfn (or kfn key') vfn (or vfn val')] @@ -366,7 +369,10 @@ @acc) ; downstream is done, propagate (do (vswap! m assoc! k nop-rf) - (krf @acc))) + (let [acc (krf @acc)] + (when (reduced? acc) + (vreset! m (transient {}))) + acc))) acc))))))))))) (defn into-by-key @@ -683,7 +689,11 @@ (do (vreset! rfs nil) acc) ; downstream is done, propagate - (do (vswap! rfs dissoc tag) (rf @acc))) + (do (vswap! rfs dissoc tag) + (let [acc (rf @acc)] + (when (reduced? acc) + (vreset! rfs nil)) + acc))) acc))) acc @rfs)) (fn [acc step? invoke] @@ -695,7 +705,11 @@ (do (vreset! rfs nil) acc) ; downstream is done, propagate - (do (vswap! rfs disj rf) (rf @acc))) + (do (vswap! rfs disj rf) + (let [acc (rf @acc)] + (when (reduced? acc) + (vreset! rfs nil)) + acc))) acc))) acc @rfs)))] (kvrf diff --git a/test/net/cgrand/xforms_test.cljc b/test/net/cgrand/xforms_test.cljc index 1824721..b999948 100644 --- a/test/net/cgrand/xforms_test.cljc +++ b/test/net/cgrand/xforms_test.cljc @@ -142,6 +142,33 @@ (is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100))))) (is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100)))))) +(deftest by-key + (testing "Respects reduced from multiplexed reductions and from the downstream reduction." + (is (= {:x 3 :y 5} + (into {} + (x/by-key (x/reduce rf/some)) + [[:x 3] [:y 5] [:x 2]])) + "Respects multiplexed reduced.") + + (is (= [:x 4] + (transduce + (x/by-key (map inc)) + rf/some + [[:x 3] [:y 5] [:x 2]])) + "Respects downstream reduced.") + + (is (= [:y 4] + (transduce + (x/by-key (x/reduce (fn + ([] 0) + ([sum] sum) + ([sum x] + (let [sum (+ sum x)] + (if (> sum 4) (reduced 4) sum)))))) + rf/some + [[:x 3] [:y 5] [:x 2]])) + "Respects reduced in downstream complete."))) + (deftest multiplex (testing "Respects reduced from multiplexed reductions and from the downstream reduction." (testing "Respects multiplexed reduced." @@ -176,4 +203,16 @@ (transduce (x/multiplex {:x (x/reduce rf/last)}) rf/some + [3 5 2])))) + + (testing "Respects reduced in downstream complete." + (is (= 3 + (transduce + (x/multiplex [(x/reduce rf/some) (x/reduce rf/some)]) + rf/some + [3 5 2]))) + (is (= 3 + (transduce + (x/multiplex {:x (x/reduce rf/some) :y (x/reduce rf/some)}) + (completing rf/some second) [3 5 2]))))))