diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj index 44bb5fc..fa87130 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.clj @@ -142,39 +142,44 @@ Partitions contain the \"value part\" (as returned by vfn) of each item. The resulting transformed items are wrapped back into a \"pair\" using the pair function. Default values for kfn, vfn and pair are first, second (or identity if kfn is specified) and vector." - ([xform] (by-key key' val' vector xform)) + ([xform] (by-key nil nil vector xform)) ([kfn xform] (by-key kfn identity vector xform)) ([kfn vfn xform] (by-key kfn vfn vector xform)) ([kfn vfn pair xform] - (fn [rf] - (let [make-rf (if pair - (if-some [rf (when (identical? vector pair) (some-kvrf rf))] - (fn [k] (fn ([acc] acc) ([acc v] (rf acc k v)))) - (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v)))))) - (constantly (multiplexable rf))) - m (volatile! (transient {}))] - (if (and (= key' kfn) (= val' vfn)) - (kvrf self - ([] (rf)) - ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) - ([acc x] - (self acc (key' x) (val' x))) - ([acc k v] - (let [krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) - acc (krf acc v)] - (when (reduced? acc) ; complete? - (vswap! m assoc! k noprf)) - (unreduced acc)))) - (fn - ([] (rf)) - ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) - ([acc x] - (let [k (kfn x) - krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) - acc (krf acc (vfn x))] - (when (reduced? acc) ; complete? - (vswap! m assoc! k noprf)) - (unreduced acc))))))))) + (let [pair (if (identical? vector pair) ::default pair)] + (fn [rf] + (let [make-rf (cond + (nil? pair) (constantly (multiplexable rf)) + (= ::default pair) + (if-some [rf (some-kvrf rf)] + (fn [k] (fn ([acc] acc) ([acc v] (rf acc k v)))) + (fn [k] (fn ([acc] acc) ([acc v] (rf acc (vector k v)))))) + :else (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v)))))) + m (volatile! (transient {}))] + (if (and (nil? kfn) (nil? vfn)) + (kvrf self + ([] (rf)) + ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) + ([acc x] + (self acc (key' x) (val' x))) + ([acc k v] + (let [krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) + acc (krf acc v)] + (when (reduced? acc) ; complete? + (vswap! m assoc! k noprf)) + (unreduced acc)))) + (let [kfn (or kfn key') + vfn (or vfn val')] + (fn + ([] (rf)) + ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) + ([acc x] + (let [k (kfn x) + krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) + acc (krf acc (vfn x))] + (when (reduced? acc) ; complete? + (vswap! m assoc! k noprf)) + (unreduced acc))))))))))) (defn- spawn "Every n items, spawns a new pipeline."