From 219e7d2aac4f67006f4589433be12fe60f37cc24 Mon Sep 17 00:00:00 2001 From: Christophe Grand Date: Wed, 1 Jun 2016 10:28:08 +0200 Subject: [PATCH] Makes x/for to support kvs in and out, remove tag and map-kv (replaced by for) --- src/net/cgrand/xforms.clj | 171 +++++++++++++++++++------------------- 1 file changed, 86 insertions(+), 85 deletions(-) diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj index aca3aef..482b7b9 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.clj @@ -6,25 +6,42 @@ (defmacro for "Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer." - [[binding %or_ & seq-exprs] body] + [[binding %or_ & seq-exprs] body-expr] (assert (and (symbol? %or_) (#{"%" "_"} (name %or_))) "The second element of the comprehension vector must be % or _.") (let [rf (gensym 'rf) acc (gensym 'acc) - body - (clj/reduce (fn [body [expr binding]] + pair? #(and (vector? %) (= 2 (clj/count %))) + build (fn [init] + (clj/reduce (fn [body [expr binding]] (case binding :let `(let ~expr ~body) :when `(if ~expr ~body ~acc) :while `(if ~expr ~body (reduced ~acc)) `(clj/reduce (fn [~acc ~binding] ~body) ~acc ~expr))) - `(~rf ~acc ~body) - (clj/partition 2 (rseq (vec seq-exprs))))] - `(fn [~rf] - (fn - ([] (~rf)) - ([~acc] (~rf ~acc)) - ([~acc ~binding] ~body))))) + init (clj/partition 2 (rseq (vec seq-exprs))))) + body (build `(~rf ~acc ~body-expr)) + kvbody (when (pair? body-expr) (build `(~rf ~acc ~@body-expr))) + fnsym (if (and (pair? binding) (not (some keyword? binding)) (not (some #{"&"} (filter symbol? binding)))) `kvrf `fn)] + (if kvbody + `(fn [~rf] + (if-some [~rf (some-kvrf ~rf)] + (~fnsym + ([] (~rf)) + ([~acc] (~rf ~acc)) + ([~acc ~binding] ~kvbody) + ~@(when (= fnsym `kvrf) [`([~acc ~@binding] ~kvbody)])) + (~fnsym + ([] (~rf)) + ([~acc] (~rf ~acc)) + ([~acc ~binding] ~body) + ~@(when (= fnsym `kvrf) [`([~acc ~@binding] ~body)])))) + `(fn [~rf] + (~fnsym + ([] (~rf)) + ([~acc] (~rf ~acc)) + ([~acc ~binding] ~body) + ~@(when (= fnsym `kvrf) [`([~acc ~@binding] ~body)])))))) (defprotocol KvRfable "Protocol for reducing fns that takes key and val as separate arguments." (some-kvrf [f] "Returns a kvrf or nil")) @@ -45,6 +62,14 @@ ~@(clj/for [[args & body] fn-bodies] `(invoke [~name ~@args] ~@body))))) +(defn ensure-kvrf [rf] + (or (some-kvrf rf) + (kvrf + ([] (rf)) + ([acc] (rf acc)) + ([acc x] (rf acc x)) + ([acc k v] (rf acc (clojure.lang.MapEntry. k v)))))) + (defn reduce "A transducer that reduces a collection to a 1-item collection consisting of only the reduced result. Unlike reduce but like transduce it does call the completing arity (1) of the reducing fn." @@ -151,9 +176,8 @@ (let [make-rf (cond (nil? pair) (constantly (multiplexable rf)) (= ::default pair) - (if-some [rf (some-kvrf rf)] - (fn [k] (fn ([acc] acc) ([acc v] (rf acc k v)))) - (fn [k] (fn ([acc] acc) ([acc v] (rf acc (vector k v)))))) + (let [rf (ensure-kvrf rf)] + (fn [k] (fn ([acc] acc) ([acc v] (rf acc k v))))) :else (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v)))))) m (volatile! (transient {}))] (if (and (nil? kfn) (nil? vfn)) @@ -290,48 +314,55 @@ n is the integral number of steps by which the window slides. With a 1-hour window, 4 means that the window slides every 15 minutes. f and invf work like in #'window." - [timef n f invf] - (let [timef (fn [x] (long (Math/floor (* n (timef x)))))] - (fn [rf] - (let [dq (java.util.ArrayDeque.) - vwacc (volatile! (f)) - flush! - (fn [acc ^long from-ts ^long to-ts] - (loop [ts from-ts acc acc wacc @vwacc] - (let [x (.peekFirst dq)] - (cond - (= ts (timef x)) - (do - (.pollFirst dq) - (recur ts acc (invf wacc x))) - (= ts to-ts) - (do - (vreset! vwacc wacc) - acc) - :else - (let [acc (rf acc (f wacc))] - (if (reduced? acc) - (do - (vreset! vwacc wacc) - acc) - (recur (inc ts) acc wacc)))))))] - (fn - ([] (rf)) - ([acc] - (let [acc (if-not (.isEmpty dq) - (unreduced (rf acc (f @vwacc))) - acc)] - (rf acc))) - ([acc x] - (let [limit (- (timef x) n) - prev-limit (if-some [prev-x (.peekLast dq)] - (- (timef prev-x) n) - limit) - _ (.addLast dq x) ; so dq is never empty for flush! - acc (flush! acc prev-limit limit)] - (when-not (reduced? acc) - (vswap! vwacc f x)) - acc))))))) + ([timef n f] + (window-by-time timef n + (fn + ([] clojure.lang.PersistentQueue/EMPTY) + ([q] (f (clj/reduce f (f) q))) + ([q x] (conj q x))) + (fn [q _] (pop q)))) + ([timef n f invf] + (let [timef (fn [x] (long (Math/floor (* n (timef x)))))] + (fn [rf] + (let [dq (java.util.ArrayDeque.) + vwacc (volatile! (f)) + flush! + (fn [acc ^long from-ts ^long to-ts] + (loop [ts from-ts acc acc wacc @vwacc] + (let [x (.peekFirst dq)] + (cond + (= ts (timef x)) + (do + (.pollFirst dq) + (recur ts acc (invf wacc x))) + (= ts to-ts) + (do + (vreset! vwacc wacc) + acc) + :else + (let [acc (rf acc (f wacc))] + (if (reduced? acc) + (do + (vreset! vwacc wacc) + acc) + (recur (inc ts) acc wacc)))))))] + (fn + ([] (rf)) + ([acc] + (let [acc (if-not (.isEmpty dq) + (unreduced (rf acc (f @vwacc))) + acc)] + (rf acc))) + ([acc x] + (let [limit (- (timef x) n) + prev-limit (if-some [prev-x (.peekLast dq)] + (- (timef prev-x) n) + limit) + _ (.addLast dq x) ; so dq is never empty for flush! + acc (flush! acc prev-limit limit)] + (when-not (reduced? acc) + (vswap! vwacc f x)) + acc)))))))) (defn count ([] 0) ([n] n) ([n _] (inc n))) @@ -386,33 +417,3 @@ (fn ))) ) - -(defn tag - "Like (map #(vector tag %)) but potentially more efficient." - [tag] - (fn [rf] - (if-some [rf (some-kvrf rf)] - (fn - ([] (rf)) - ([acc] (rf acc)) - ([acc v] (rf acc tag v))) - (fn - ([] (rf)) - ([acc] (rf acc)) - ([acc v] (rf acc [tag v])))))) - -(defn map-kv - "Like (map (fn [[k v]] [(kf k v) (vf k v)])) but potentially more efficient." - [kf vf] - (fn [rf] - (if-some [rf (some-kvrf rf)] - (kvrf - ([] (rf)) - ([acc] (rf acc)) - ([acc [k v]] (rf acc (kf k v) (vf k v))) - ([acc k v] (rf acc (kf k v) (vf k v)))) - (kvrf - ([] (rf)) - ([acc] (rf acc)) - ([acc [k v]] (rf acc [(kf k v) (vf k v)])) - ([acc k v] (rf acc [(kf k v) (vf k v)])))))) \ No newline at end of file