diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj index 30e5b32..3902125 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.clj @@ -26,30 +26,69 @@ ([~acc] (~rf ~acc)) ([~acc ~binding] ~body))))) +(defprotocol KvRf "Marker protocol for reducing fns that takes key and val as separate arguments.") + +(defmacro kvrf [name? & fn-bodies] + (let [name (if (symbol? name?) name? (gensym '_)) + fn-bodies (if (symbol? name?) fn-bodies (cons name? fn-bodies)) + fn-bodies (if (vector? (first fn-bodies)) (list fn-bodies) fn-bodies)] + `(reify + clojure.lang.Fn + KvRf + clojure.lang.IFn + ~@(clj/for [[args & body] fn-bodies] + `(invoke [~name ~@args] ~@body))))) + (defn reduce "A transducer that reduces a collection to a 1-item collection consisting of only the reduced result. Unlike reduce but like transduce it does call the completing arity (1) of the reducing fn." ([f] (fn [rf] - (let [vacc (volatile! (f))] - (fn - ([] (rf)) - ([acc] (rf (rf acc (f (unreduced @vacc))))) - ([acc x] - (if (reduced? (vswap! vacc f x)) - (reduced acc) - acc)))))) + (let [vacc (volatile! (f))] + (if (satisfies? KvRf f) + (kvrf + ([] (rf)) + ([acc] (rf (rf acc (f (unreduced @vacc))))) + ([acc x] + (if (reduced? (vswap! vacc f x)) + (reduced acc) + acc)) + ([acc k v] + (if (reduced? (vswap! vacc f k v)) + (reduced acc) + acc))) + (fn + ([] (rf)) + ([acc] (rf (rf acc (f (unreduced @vacc))))) + ([acc x] + (if (reduced? (vswap! vacc f x)) + (reduced acc) + acc))))))) ([f init] (reduce (fn ([] init) ([acc] (f acc)) ([acc x] (f acc x)))))) (defn into "Returns a transducer which accumulate every input in a collection and outputs only the accumulated collection." [coll] - (reduce (if (instance? clojure.lang.IEditableCollection coll) - (fn - ([] (transient coll)) - ([acc] (persistent! acc)) - ([acc x] (conj! acc x))) + (reduce (cond + (instance? clojure.lang.IEditableCollection coll) + (if (map? coll) + (kvrf + ([] (transient coll)) + ([acc] (persistent! acc)) + ([acc x] (conj! acc x)) + ([acc k v] (assoc! acc k v))) + (fn + ([] (transient coll)) + ([acc] (persistent! acc)) + ([acc x] (conj! acc x)))) + (map? coll) + (kvrf + ([] coll) + ([acc] acc) + ([acc x] (conj acc x)) + ([acc k v] (assoc acc k v))) + :else (fn ([] coll) ([acc] acc) @@ -92,20 +131,36 @@ ([kfn vfn xform] (by-key kfn vfn vector xform)) ([kfn vfn pair xform] (fn [rf] - (let [make-rf (if pair + (let [make-rf (cond + (and (= vector pair) (satisfies? KvRf rf)) + (fn [k] (fn ([acc] acc) ([acc v] (rf acc k v)))) + pair (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v))))) + :else (constantly (multiplexable rf))) m (volatile! (transient {}))] - (fn self - ([] (rf)) - ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) - ([acc x] - (let [k (kfn x) - krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) - acc (krf acc (vfn x))] - (when (reduced? acc) ; complete? - (vswap! m assoc! k noprf)) - (unreduced acc)))))))) + (if (and (= key' kfn) (= val' vfn)) + (kvrf self + ([] (rf)) + ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) + ([acc x] + (self acc (key' x) (val' x))) + ([acc k v] + (let [krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) + acc (krf acc v)] + (when (reduced? acc) ; complete? + (vswap! m assoc! k noprf)) + (unreduced acc)))) + (fn + ([] (rf)) + ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) + ([acc x] + (let [k (kfn x) + krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) + acc (krf acc (vfn x))] + (when (reduced? acc) ; complete? + (vswap! m assoc! k noprf)) + (unreduced acc))))))))) (defn- spawn "Every n items, spawns a new pipeline."