Merge fa2411c07d into 62375212a8
This commit is contained in:
commit
74dd5fcfc9
2 changed files with 85 additions and 1 deletions
|
|
@ -806,3 +806,46 @@
|
||||||
([dimensions valfn summary-fn coll]
|
([dimensions valfn summary-fn coll]
|
||||||
(into {} (rollup dimensions valfn summary-fn) coll)))
|
(into {} (rollup dimensions valfn summary-fn) coll)))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
(defn parallel [rf]
|
||||||
|
"A transducer that runs the given reducing function in the background.
|
||||||
|
That is, when (parallel rf) is given an element x to reduce, it
|
||||||
|
starts a future running rf on x. When given the next element, it
|
||||||
|
will wait for the previous future to finish first before launching
|
||||||
|
the (rf acc y) future.
|
||||||
|
|
||||||
|
When (parallel rf) is completed (called with arity 1), it waits
|
||||||
|
until the previous element has been reduced, and then finalizes rf
|
||||||
|
synchronously.
|
||||||
|
|
||||||
|
This can be used to parallelize a chain of transducers. However,
|
||||||
|
since (parallel rf) only does a single call to rf on the
|
||||||
|
background, it's only useful when parallelizing transducers with
|
||||||
|
expensive operations on single elements. It probably won't help
|
||||||
|
with speeding up a situation where a large number of elements are
|
||||||
|
reduced with a relatively cheap function.
|
||||||
|
|
||||||
|
In general, (comp parallelize xform) and (comp xform parallelize)
|
||||||
|
has the same result as xform.
|
||||||
|
|
||||||
|
Usage patterns:
|
||||||
|
(comp (map expensive-operation)
|
||||||
|
parallelize
|
||||||
|
(map expensive-operation2))
|
||||||
|
|
||||||
|
(reducing-context (parallelize expensive-reducing-function))"
|
||||||
|
(let [memory (atom nil)]
|
||||||
|
(fn
|
||||||
|
([] (rf))
|
||||||
|
([acc]
|
||||||
|
(let [prev (if-let [pending @memory]
|
||||||
|
@pending
|
||||||
|
acc)]
|
||||||
|
(rf (unreduced prev)))) ;; ???
|
||||||
|
([acc input]
|
||||||
|
(let [prev (if-let [pending @memory]
|
||||||
|
@pending
|
||||||
|
acc)]
|
||||||
|
(when-not (reduced? prev)
|
||||||
|
(reset! memory (future (rf prev input))))
|
||||||
|
prev)))))
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,10 @@
|
||||||
(is (trial (x/for [x % y (range x)] [x y])
|
(is (trial (x/for [x % y (range x)] [x y])
|
||||||
4 (range 16)))
|
4 (range 16)))
|
||||||
(is (trial (x/reduce +)
|
(is (trial (x/reduce +)
|
||||||
|
4 (range 16)))
|
||||||
|
(is (trial x/parallel
|
||||||
|
4 (range 16)))
|
||||||
|
(is (trial (comp x/parallel (map inc) x/parallel)
|
||||||
4 (range 16)))))
|
4 (range 16)))))
|
||||||
|
|
||||||
(deftest reductions
|
(deftest reductions
|
||||||
|
|
@ -139,4 +143,41 @@
|
||||||
(is (= (range 100) (x/into [] (x/sort) (shuffle (range 100)))))
|
(is (= (range 100) (x/into [] (x/sort) (shuffle (range 100)))))
|
||||||
(is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100)))))
|
(is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100)))))
|
||||||
(is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100)))))
|
(is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100)))))
|
||||||
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
|
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
|
||||||
|
|
||||||
|
(deftest parallel
|
||||||
|
(is (= [1 2 3 4 5]
|
||||||
|
(into [] (comp x/parallel (map inc)) (range 5))
|
||||||
|
(into [] (comp (map inc) x/parallel) (range 5))
|
||||||
|
(into [] (comp x/parallel (map inc) x/parallel) (range 5))))
|
||||||
|
(let [barrier-1 (java.util.concurrent.CyclicBarrier. 2)
|
||||||
|
tick! #(.await barrier-1 500 java.util.concurrent.TimeUnit/MILLISECONDS)
|
||||||
|
barrier-2 (java.util.concurrent.CyclicBarrier. 2)
|
||||||
|
tock! #(.await barrier-2 500 java.util.concurrent.TimeUnit/MILLISECONDS)
|
||||||
|
trace (atom [])
|
||||||
|
log! #(swap! trace conj %)
|
||||||
|
rf (fn
|
||||||
|
([acc]
|
||||||
|
(log! [:finish acc])
|
||||||
|
acc)
|
||||||
|
([acc x]
|
||||||
|
(log! [:start x])
|
||||||
|
(tick!)
|
||||||
|
(tock!)
|
||||||
|
(log! [:end x])
|
||||||
|
(+ acc x)))]
|
||||||
|
(testing "test concurrency"
|
||||||
|
(dotimes [_ 100]
|
||||||
|
(reset! trace [])
|
||||||
|
(let [par (x/parallel rf)]
|
||||||
|
(is (= 10 (par 10 1)) "first call just returns initial state")
|
||||||
|
(tick!)
|
||||||
|
(is (= [[:start 1]] @trace) "first elements starts reducing on the background")
|
||||||
|
(tock!)
|
||||||
|
(is (= 11 (par 10 2)) "second call returns first state")
|
||||||
|
(tick!)
|
||||||
|
(is (= [[:start 1] [:end 1] [:start 2]] @trace) "first element finished, second starts reducing")
|
||||||
|
(tock!)
|
||||||
|
(is (= 13 (par 11)) "completion call returns final state")
|
||||||
|
(is (= [[:start 1] [:end 1] [:start 2] [:end 2] [:finish 13]] @trace)
|
||||||
|
"second element finishes, completion"))))))
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue