tightening code around kvrfs, x/for now leverages kvreducibles on inner colls

This commit is contained in:
Christophe Grand 2016-09-19 14:37:35 +02:00
parent 05a82e2b74
commit 6dd8e937d3
3 changed files with 44 additions and 64 deletions

View file

@ -15,7 +15,7 @@ Transducing context: `transjuxt` (for performing several transductions in a sing
Add this dependency to your project:
```clj
[net.cgrand/xforms "0.2.0"]
[net.cgrand/xforms "0.3.0"]
```
```clj

View file

@ -12,38 +12,36 @@
(let [rf (gensym 'rf)
acc (gensym 'acc)
pair? #(and (vector? %) (= 2 (clj/count %)))
destructuring-pair? (every-pred pair?
#(not-any? (some-fn keyword? #{'&}) %))
build (fn [init]
(clj/reduce (fn [body [expr binding]]
(case binding
:let `(let ~expr ~body)
:when `(if ~expr ~body ~acc)
:while `(if ~expr ~body (reduced ~acc))
`(clj/reduce (fn [~acc ~binding] ~body) ~acc ~expr)))
(if (destructuring-pair? binding)
`(let [expr# ~expr]
(if (and (map? expr#) (satisfies? clojure.core.protocols/IKVReduce expr#))
(clj/reduce-kv (fn [~acc ~@binding] ~body) ~acc expr#)
(clj/reduce (fn [~acc ~binding] ~body) ~acc expr#)))
`(clj/reduce (fn [~acc ~binding] ~body) ~acc ~expr))))
init (clj/partition 2 (rseq (vec seq-exprs)))))
body (build `(~rf ~acc ~body-expr))
kvbody (when (pair? body-expr) (build `(~rf ~acc ~@body-expr)))
fnsym (if (and (pair? binding) (not (some keyword? binding)) (not (some #{'&} (filter symbol? binding)))) `kvrf `fn)]
(if kvbody
`(fn [~rf]
(if-some [~rf (some-kvrf ~rf)]
(~fnsym
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~kvbody)
~@(when (= fnsym `kvrf) [`([~acc ~@binding] ~kvbody)]))
(~fnsym
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~body)
~@(when (= fnsym `kvrf) [`([~acc ~@binding] ~body)]))))
`(fn [~rf]
(~fnsym
body (if (and (pair? body-expr) (nil? (meta body-expr)))
(build `(~rf ~acc ~@body-expr))
(build `(~rf ~acc ~body-expr)))]
`(fn [~rf]
(let [~rf (ensure-kvrf ~rf)]
(kvrf
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~body)
~@(when (= fnsym `kvrf) [`([~acc ~@binding] ~body)]))))))
~(if (destructuring-pair? binding)
`([~acc ~@binding] ~body)
`([~acc k# v#]
(let [~binding (clojure.lang.MapEntry. k# v#)] ~body))))))))
(defprotocol KvRfable "Protocol for reducing fns that takes key and val as separate arguments."
(defprotocol KvRfable "Protocol for reducing fns that accept key and val as separate arguments."
(some-kvrf [f] "Returns a kvrf or nil"))
(extend-protocol KvRfable
@ -76,7 +74,7 @@
([f]
(fn [rf]
(let [vacc (volatile! (f))]
(if-some [f (some-kvrf f)]
(let [f (ensure-kvrf f)]
(kvrf
([] (rf))
([acc] (rf (unreduced (rf acc (f (unreduced @vacc))))))
@ -86,13 +84,6 @@
acc))
([acc k v]
(if (reduced? (vswap! vacc f k v))
(reduced acc)
acc)))
(fn
([] (rf))
([acc] (rf (unreduced (rf acc (f (unreduced @vacc))))))
([acc x]
(if (reduced? (vswap! vacc f x))
(reduced acc)
acc)))))))
([f init]
@ -159,9 +150,8 @@
(defn- multiplexable
"Creates a multiplexable reducing function (doesn't init or complete the uderlying rf)."
[rf]
(if-some [rf (some-kvrf rf)]
(kvrf ([]) ([acc] acc) ([acc x] (rf acc x)) ([acc k v] (rf acc k v)))
(fn ([]) ([acc] acc) ([acc x] (rf acc x))))) ; no init no complete rf
(let [rf (ensure-kvrf rf)]
(kvrf ([]) ([acc] acc) ([acc x] (rf acc x)) ([acc k v] (rf acc k v))))) ; no init no complete rf
(defn by-key
"Returns a transducer which partitions items according to kfn.
@ -376,30 +366,20 @@
"Returns a reducing fn which compute all rfns at once and whose final return
value is a vector of the final return values of each rfns."
[& rfns]
(if (some some-kvrf rfns)
(let [rfns (mapv ensure-kvrf rfns)]
(kvrf
([] (mapv #(vector % (volatile! (%))) rfns))
([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
([acc x]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf x) true))
false acc)]
(if some-unreduced acc (reduced acc))))
([acc k v]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf k v) true))
false acc)]
(if some-unreduced acc (reduced acc))))))
(let [rfns (vec rfns)]
(fn
([] (mapv #(vector % (volatile! (%))) rfns))
([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
([acc x]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf x) true))
false acc)]
(if some-unreduced acc (reduced acc))))))))
(let [rfns (mapv ensure-kvrf rfns)]
(kvrf
([] (mapv #(vector % (volatile! (%))) rfns))
([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
([acc x]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf x) true))
false acc)]
(if some-unreduced acc (reduced acc))))
([acc k v]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf k v) true))
false acc)]
(if some-unreduced acc (reduced acc)))))))
(defn multiplex
[xforms-map]
@ -439,16 +419,12 @@
[& key-rfns]
(let [f (apply juxt (take-nth 2 (next key-rfns)))
keys (vec (take-nth 2 key-rfns))]
(if-some [f (some-kvrf f)]
(let [f (ensure-kvrf f)]
(kvrf
([] (f))
([acc] (zipmap keys (f acc)))
([acc x] (f acc x))
([acc k v] (f acc k v)))
(fn
([] (f))
([acc] (zipmap keys (f acc)))
([acc x] (f acc x))))))
([acc k v] (f acc k v))))))
(defn first
"Reducing function that returns the first value or nil if none."

View file

@ -101,4 +101,8 @@
[{:ts 3.25} {:ts 3.5} {:ts 3.75} {:ts 4.0}] ; t = 4.0
[{:ts 3.5} {:ts 3.75} {:ts 4.0} {:ts 4.25}] ; t = 4.25
[{:ts 3.75} {:ts 4.0} {:ts 4.25} {:ts 4.5}] ; t = 4.5
[{:ts 4.0} {:ts 4.25} {:ts 4.5} {:ts 4.75}]]))) ; t = 4.75
[{:ts 4.0} {:ts 4.25} {:ts 4.5} {:ts 4.75}]]))) ; t = 4.75
(deftest do-not-kvreduce-vectors
(is (= {0 nil 1 nil} (x/into {} (x/for [[k v] %] [k v]) [[0] [1]])))
(is (= {0 nil 1 nil} (x/into {} (x/for [_ % [k v] [[0] [1]]] [k v]) ["a"]))))