pass key and value as distinct arguments when possible

This commit is contained in:
Christophe Grand 2015-09-09 11:49:07 +02:00
parent a759ca7e7a
commit bb9e808200

View file

@ -26,30 +26,69 @@
([~acc] (~rf ~acc)) ([~acc] (~rf ~acc))
([~acc ~binding] ~body))))) ([~acc ~binding] ~body)))))
(defprotocol KvRf "Marker protocol for reducing fns that takes key and val as separate arguments.")
(defmacro kvrf [name? & fn-bodies]
(let [name (if (symbol? name?) name? (gensym '_))
fn-bodies (if (symbol? name?) fn-bodies (cons name? fn-bodies))
fn-bodies (if (vector? (first fn-bodies)) (list fn-bodies) fn-bodies)]
`(reify
clojure.lang.Fn
KvRf
clojure.lang.IFn
~@(clj/for [[args & body] fn-bodies]
`(invoke [~name ~@args] ~@body)))))
(defn reduce (defn reduce
"A transducer that reduces a collection to a 1-item collection consisting of only the reduced result. "A transducer that reduces a collection to a 1-item collection consisting of only the reduced result.
Unlike reduce but like transduce it does call the completing arity (1) of the reducing fn." Unlike reduce but like transduce it does call the completing arity (1) of the reducing fn."
([f] ([f]
(fn [rf] (fn [rf]
(let [vacc (volatile! (f))] (let [vacc (volatile! (f))]
(if (satisfies? KvRf f)
(kvrf
([] (rf))
([acc] (rf (rf acc (f (unreduced @vacc)))))
([acc x]
(if (reduced? (vswap! vacc f x))
(reduced acc)
acc))
([acc k v]
(if (reduced? (vswap! vacc f k v))
(reduced acc)
acc)))
(fn (fn
([] (rf)) ([] (rf))
([acc] (rf (rf acc (f (unreduced @vacc))))) ([acc] (rf (rf acc (f (unreduced @vacc)))))
([acc x] ([acc x]
(if (reduced? (vswap! vacc f x)) (if (reduced? (vswap! vacc f x))
(reduced acc) (reduced acc)
acc)))))) acc)))))))
([f init] ([f init]
(reduce (fn ([] init) ([acc] (f acc)) ([acc x] (f acc x)))))) (reduce (fn ([] init) ([acc] (f acc)) ([acc x] (f acc x))))))
(defn into (defn into
"Returns a transducer which accumulate every input in a collection and outputs only the accumulated collection." "Returns a transducer which accumulate every input in a collection and outputs only the accumulated collection."
[coll] [coll]
(reduce (if (instance? clojure.lang.IEditableCollection coll) (reduce (cond
(instance? clojure.lang.IEditableCollection coll)
(if (map? coll)
(kvrf
([] (transient coll))
([acc] (persistent! acc))
([acc x] (conj! acc x))
([acc k v] (assoc! acc k v)))
(fn (fn
([] (transient coll)) ([] (transient coll))
([acc] (persistent! acc)) ([acc] (persistent! acc))
([acc x] (conj! acc x))) ([acc x] (conj! acc x))))
(map? coll)
(kvrf
([] coll)
([acc] acc)
([acc x] (conj acc x))
([acc k v] (assoc acc k v)))
:else
(fn (fn
([] coll) ([] coll)
([acc] acc) ([acc] acc)
@ -92,11 +131,27 @@
([kfn vfn xform] (by-key kfn vfn vector xform)) ([kfn vfn xform] (by-key kfn vfn vector xform))
([kfn vfn pair xform] ([kfn vfn pair xform]
(fn [rf] (fn [rf]
(let [make-rf (if pair (let [make-rf (cond
(and (= vector pair) (satisfies? KvRf rf))
(fn [k] (fn ([acc] acc) ([acc v] (rf acc k v))))
pair
(fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v))))) (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v)))))
:else
(constantly (multiplexable rf))) (constantly (multiplexable rf)))
m (volatile! (transient {}))] m (volatile! (transient {}))]
(fn self (if (and (= key' kfn) (= val' vfn))
(kvrf self
([] (rf))
([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m)))))
([acc x]
(self acc (key' x) (val' x)))
([acc k v]
(let [krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
acc (krf acc v)]
(when (reduced? acc) ; complete?
(vswap! m assoc! k noprf))
(unreduced acc))))
(fn
([] (rf)) ([] (rf))
([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m)))))
([acc x] ([acc x]
@ -105,7 +160,7 @@
acc (krf acc (vfn x))] acc (krf acc (vfn x))]
(when (reduced? acc) ; complete? (when (reduced? acc) ; complete?
(vswap! m assoc! k noprf)) (vswap! m assoc! k noprf))
(unreduced acc)))))))) (unreduced acc)))))))))
(defn- spawn (defn- spawn
"Every n items, spawns a new pipeline." "Every n items, spawns a new pipeline."