Makes x/for to support kvs in and out, remove tag and map-kv (replaced by for)

This commit is contained in:
Christophe Grand 2016-06-01 10:28:08 +02:00
parent 07225a92a4
commit 219e7d2aac

View file

@ -6,25 +6,42 @@
(defmacro for
"Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer."
[[binding %or_ & seq-exprs] body]
[[binding %or_ & seq-exprs] body-expr]
(assert (and (symbol? %or_) (#{"%" "_"} (name %or_)))
"The second element of the comprehension vector must be % or _.")
(let [rf (gensym 'rf)
acc (gensym 'acc)
body
(clj/reduce (fn [body [expr binding]]
pair? #(and (vector? %) (= 2 (clj/count %)))
build (fn [init]
(clj/reduce (fn [body [expr binding]]
(case binding
:let `(let ~expr ~body)
:when `(if ~expr ~body ~acc)
:while `(if ~expr ~body (reduced ~acc))
`(clj/reduce (fn [~acc ~binding] ~body) ~acc ~expr)))
`(~rf ~acc ~body)
(clj/partition 2 (rseq (vec seq-exprs))))]
`(fn [~rf]
(fn
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~body)))))
init (clj/partition 2 (rseq (vec seq-exprs)))))
body (build `(~rf ~acc ~body-expr))
kvbody (when (pair? body-expr) (build `(~rf ~acc ~@body-expr)))
fnsym (if (and (pair? binding) (not (some keyword? binding)) (not (some #{"&"} (filter symbol? binding)))) `kvrf `fn)]
(if kvbody
`(fn [~rf]
(if-some [~rf (some-kvrf ~rf)]
(~fnsym
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~kvbody)
~@(when (= fnsym `kvrf) [`([~acc ~@binding] ~kvbody)]))
(~fnsym
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~body)
~@(when (= fnsym `kvrf) [`([~acc ~@binding] ~body)]))))
`(fn [~rf]
(~fnsym
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~body)
~@(when (= fnsym `kvrf) [`([~acc ~@binding] ~body)]))))))
(defprotocol KvRfable "Protocol for reducing fns that takes key and val as separate arguments."
(some-kvrf [f] "Returns a kvrf or nil"))
@ -45,6 +62,14 @@
~@(clj/for [[args & body] fn-bodies]
`(invoke [~name ~@args] ~@body)))))
(defn ensure-kvrf [rf]
(or (some-kvrf rf)
(kvrf
([] (rf))
([acc] (rf acc))
([acc x] (rf acc x))
([acc k v] (rf acc (clojure.lang.MapEntry. k v))))))
(defn reduce
"A transducer that reduces a collection to a 1-item collection consisting of only the reduced result.
Unlike reduce but like transduce it does call the completing arity (1) of the reducing fn."
@ -151,9 +176,8 @@
(let [make-rf (cond
(nil? pair) (constantly (multiplexable rf))
(= ::default pair)
(if-some [rf (some-kvrf rf)]
(fn [k] (fn ([acc] acc) ([acc v] (rf acc k v))))
(fn [k] (fn ([acc] acc) ([acc v] (rf acc (vector k v))))))
(let [rf (ensure-kvrf rf)]
(fn [k] (fn ([acc] acc) ([acc v] (rf acc k v)))))
:else (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v))))))
m (volatile! (transient {}))]
(if (and (nil? kfn) (nil? vfn))
@ -290,48 +314,55 @@
n is the integral number of steps by which the window slides. With a 1-hour window, 4 means that the window slides every 15 minutes.
f and invf work like in #'window."
[timef n f invf]
(let [timef (fn [x] (long (Math/floor (* n (timef x)))))]
(fn [rf]
(let [dq (java.util.ArrayDeque.)
vwacc (volatile! (f))
flush!
(fn [acc ^long from-ts ^long to-ts]
(loop [ts from-ts acc acc wacc @vwacc]
(let [x (.peekFirst dq)]
(cond
(= ts (timef x))
(do
(.pollFirst dq)
(recur ts acc (invf wacc x)))
(= ts to-ts)
(do
(vreset! vwacc wacc)
acc)
:else
(let [acc (rf acc (f wacc))]
(if (reduced? acc)
(do
(vreset! vwacc wacc)
acc)
(recur (inc ts) acc wacc)))))))]
(fn
([] (rf))
([acc]
(let [acc (if-not (.isEmpty dq)
(unreduced (rf acc (f @vwacc)))
acc)]
(rf acc)))
([acc x]
(let [limit (- (timef x) n)
prev-limit (if-some [prev-x (.peekLast dq)]
(- (timef prev-x) n)
limit)
_ (.addLast dq x) ; so dq is never empty for flush!
acc (flush! acc prev-limit limit)]
(when-not (reduced? acc)
(vswap! vwacc f x))
acc)))))))
([timef n f]
(window-by-time timef n
(fn
([] clojure.lang.PersistentQueue/EMPTY)
([q] (f (clj/reduce f (f) q)))
([q x] (conj q x)))
(fn [q _] (pop q))))
([timef n f invf]
(let [timef (fn [x] (long (Math/floor (* n (timef x)))))]
(fn [rf]
(let [dq (java.util.ArrayDeque.)
vwacc (volatile! (f))
flush!
(fn [acc ^long from-ts ^long to-ts]
(loop [ts from-ts acc acc wacc @vwacc]
(let [x (.peekFirst dq)]
(cond
(= ts (timef x))
(do
(.pollFirst dq)
(recur ts acc (invf wacc x)))
(= ts to-ts)
(do
(vreset! vwacc wacc)
acc)
:else
(let [acc (rf acc (f wacc))]
(if (reduced? acc)
(do
(vreset! vwacc wacc)
acc)
(recur (inc ts) acc wacc)))))))]
(fn
([] (rf))
([acc]
(let [acc (if-not (.isEmpty dq)
(unreduced (rf acc (f @vwacc)))
acc)]
(rf acc)))
([acc x]
(let [limit (- (timef x) n)
prev-limit (if-some [prev-x (.peekLast dq)]
(- (timef prev-x) n)
limit)
_ (.addLast dq x) ; so dq is never empty for flush!
acc (flush! acc prev-limit limit)]
(when-not (reduced? acc)
(vswap! vwacc f x))
acc))))))))
(defn count ([] 0) ([n] n) ([n _] (inc n)))
@ -386,33 +417,3 @@
(fn
)))
)
(defn tag
"Like (map #(vector tag %)) but potentially more efficient."
[tag]
(fn [rf]
(if-some [rf (some-kvrf rf)]
(fn
([] (rf))
([acc] (rf acc))
([acc v] (rf acc tag v)))
(fn
([] (rf))
([acc] (rf acc))
([acc v] (rf acc [tag v]))))))
(defn map-kv
"Like (map (fn [[k v]] [(kf k v) (vf k v)])) but potentially more efficient."
[kf vf]
(fn [rf]
(if-some [rf (some-kvrf rf)]
(kvrf
([] (rf))
([acc] (rf acc))
([acc [k v]] (rf acc (kf k v) (vf k v)))
([acc k v] (rf acc (kf k v) (vf k v))))
(kvrf
([] (rf))
([acc] (rf acc))
([acc [k v]] (rf acc [(kf k v) (vf k v)]))
([acc k v] (rf acc [(kf k v) (vf k v)]))))))