Introduce net.cgrand.xforms/parallel

This commit is contained in:
Joel Kaasinen 2021-08-10 16:05:40 +03:00
parent 62375212a8
commit fa2411c07d
2 changed files with 85 additions and 1 deletions

View file

@ -806,3 +806,46 @@
([dimensions valfn summary-fn coll]
(into {} (rollup dimensions valfn summary-fn) coll)))
)
(defn parallel [rf]
"A transducer that runs the given reducing function in the background.
That is, when (parallel rf) is given an element x to reduce, it
starts a future running rf on x. When given the next element, it
will wait for the previous future to finish first before launching
the (rf acc y) future.
When (parallel rf) is completed (called with arity 1), it waits
until the previous element has been reduced, and then finalizes rf
synchronously.
This can be used to parallelize a chain of transducers. However,
since (parallel rf) only does a single call to rf on the
background, it's only useful when parallelizing transducers with
expensive operations on single elements. It probably won't help
with speeding up a situation where a large number of elements are
reduced with a relatively cheap function.
In general, (comp parallelize xform) and (comp xform parallelize)
has the same result as xform.
Usage patterns:
(comp (map expensive-operation)
parallelize
(map expensive-operation2))
(reducing-context (parallelize expensive-reducing-function))"
(let [memory (atom nil)]
(fn
([] (rf))
([acc]
(let [prev (if-let [pending @memory]
@pending
acc)]
(rf (unreduced prev)))) ;; ???
([acc input]
(let [prev (if-let [pending @memory]
@pending
acc)]
(when-not (reduced? prev)
(reset! memory (future (rf prev input))))
prev)))))

View file

@ -70,6 +70,10 @@
(is (trial (x/for [x % y (range x)] [x y])
4 (range 16)))
(is (trial (x/reduce +)
4 (range 16)))
(is (trial x/parallel
4 (range 16)))
(is (trial (comp x/parallel (map inc) x/parallel)
4 (range 16)))))
(deftest reductions
@ -139,4 +143,41 @@
(is (= (range 100) (x/into [] (x/sort) (shuffle (range 100)))))
(is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100)))))
(is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100)))))
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
(deftest parallel
(is (= [1 2 3 4 5]
(into [] (comp x/parallel (map inc)) (range 5))
(into [] (comp (map inc) x/parallel) (range 5))
(into [] (comp x/parallel (map inc) x/parallel) (range 5))))
(let [barrier-1 (java.util.concurrent.CyclicBarrier. 2)
tick! #(.await barrier-1 500 java.util.concurrent.TimeUnit/MILLISECONDS)
barrier-2 (java.util.concurrent.CyclicBarrier. 2)
tock! #(.await barrier-2 500 java.util.concurrent.TimeUnit/MILLISECONDS)
trace (atom [])
log! #(swap! trace conj %)
rf (fn
([acc]
(log! [:finish acc])
acc)
([acc x]
(log! [:start x])
(tick!)
(tock!)
(log! [:end x])
(+ acc x)))]
(testing "test concurrency"
(dotimes [_ 100]
(reset! trace [])
(let [par (x/parallel rf)]
(is (= 10 (par 10 1)) "first call just returns initial state")
(tick!)
(is (= [[:start 1]] @trace) "first elements starts reducing on the background")
(tock!)
(is (= 11 (par 10 2)) "second call returns first state")
(tick!)
(is (= [[:start 1] [:end 1] [:start 2]] @trace) "first element finished, second starts reducing")
(tock!)
(is (= 13 (par 11)) "completion call returns final state")
(is (= [[:start 1] [:end 1] [:start 2] [:end 2] [:finish 13]] @trace)
"second element finishes, completion"))))))