Make the (fn [rf] ...) in by-key to not depend on function equality because it creates headaches when deserializing the function in aother env.

This commit is contained in:
Christophe Grand 2016-04-21 16:14:48 +02:00
parent 45af15c69d
commit c913617aa1

View file

@ -142,39 +142,44 @@
Partitions contain the \"value part\" (as returned by vfn) of each item.
The resulting transformed items are wrapped back into a \"pair\" using the pair function.
Default values for kfn, vfn and pair are first, second (or identity if kfn is specified) and vector."
([xform] (by-key key' val' vector xform))
([xform] (by-key nil nil vector xform))
([kfn xform] (by-key kfn identity vector xform))
([kfn vfn xform] (by-key kfn vfn vector xform))
([kfn vfn pair xform]
(fn [rf]
(let [make-rf (if pair
(if-some [rf (when (identical? vector pair) (some-kvrf rf))]
(fn [k] (fn ([acc] acc) ([acc v] (rf acc k v))))
(fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v))))))
(constantly (multiplexable rf)))
m (volatile! (transient {}))]
(if (and (= key' kfn) (= val' vfn))
(kvrf self
([] (rf))
([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m)))))
([acc x]
(self acc (key' x) (val' x)))
([acc k v]
(let [krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
acc (krf acc v)]
(when (reduced? acc) ; complete?
(vswap! m assoc! k noprf))
(unreduced acc))))
(fn
([] (rf))
([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m)))))
([acc x]
(let [k (kfn x)
krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
acc (krf acc (vfn x))]
(when (reduced? acc) ; complete?
(vswap! m assoc! k noprf))
(unreduced acc)))))))))
(let [pair (if (identical? vector pair) ::default pair)]
(fn [rf]
(let [make-rf (cond
(nil? pair) (constantly (multiplexable rf))
(= ::default pair)
(if-some [rf (some-kvrf rf)]
(fn [k] (fn ([acc] acc) ([acc v] (rf acc k v))))
(fn [k] (fn ([acc] acc) ([acc v] (rf acc (vector k v))))))
:else (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v))))))
m (volatile! (transient {}))]
(if (and (nil? kfn) (nil? vfn))
(kvrf self
([] (rf))
([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m)))))
([acc x]
(self acc (key' x) (val' x)))
([acc k v]
(let [krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
acc (krf acc v)]
(when (reduced? acc) ; complete?
(vswap! m assoc! k noprf))
(unreduced acc))))
(let [kfn (or kfn key')
vfn (or vfn val')]
(fn
([] (rf))
([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m)))))
([acc x]
(let [k (kfn x)
krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
acc (krf acc (vfn x))]
(when (reduced? acc) ; complete?
(vswap! m assoc! k noprf))
(unreduced acc)))))))))))
(defn- spawn
"Every n items, spawns a new pipeline."