diff --git a/project.clj b/project.clj index c0a90b5..6b96f7f 100644 --- a/project.clj +++ b/project.clj @@ -3,4 +3,6 @@ #_#_:url "http://example.com/FIXME" :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} - :dependencies [[org.clojure/clojure "1.7.0"]]) + :dependencies [[org.clojure/clojure "1.8.0"] + [org.clojure/clojurescript "1.9.369"] + [net.cgrand/macrovich "0.1.0"]]) diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.cljc similarity index 72% rename from src/net/cgrand/xforms.clj rename to src/net/cgrand/xforms.cljc index 68b2b97..f18780b 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.cljc @@ -1,84 +1,75 @@ (ns net.cgrand.xforms "Extra transducers for Clojure" {:author "Christophe Grand"} + #?(:cljs (:require-macros + [net.cgrand.macrovich :as macros] + [net.cgrand.xforms :refer [for kvrf let-complete]]) + :clj (:require [net.cgrand.macrovich :as macros])) (:refer-clojure :exclude [reduce reductions into count for partition str last keys vals min max]) - (:require [clojure.core :as clj] + (:require [#?(:clj clojure.core :cljs cljs.core) :as core] [net.cgrand.xforms.rfs :as rf])) +(macros/deftime + (defmacro for - "Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer. + "Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer. When the first expression is not % (or _) returns an eduction." - [[binding %or_ & seq-exprs] body-expr] - (if-not (and (symbol? %or_) (#{"%" "_"} (name %or_))) - `(eduction (for [~binding ~'% ~@seq-exprs] ~body-expr) ~%or_) - (let [rf (gensym 'rf) - acc (gensym 'acc) - pair? #(and (vector? %) (= 2 (clj/count %))) - destructuring-pair? (every-pred pair? - #(not-any? (some-fn keyword? #{'&}) %)) - rpairs (clj/partition 2 (rseq (vec seq-exprs))) - build (fn [init] - (clj/reduce (fn [body [expr binding]] - (case binding - :let `(let ~expr ~body) - :when `(if ~expr ~body ~acc) - :while `(if ~expr ~body (reduced ~acc)) - (if (destructuring-pair? binding) - `(let [expr# ~expr] - (if (and (map? expr#) (satisfies? clojure.core.protocols/IKVReduce expr#)) - (clj/reduce-kv (fn [~acc ~@binding] ~body) ~acc expr#) - (clj/reduce (fn [~acc ~binding] ~body) ~acc expr#))) - `(clj/reduce (fn [~acc ~binding] ~body) ~acc ~expr)))) - init rpairs)) - nested-reduceds (clj/for [[expr binding] rpairs - :when (not (keyword? binding))] - `reduced) - body (build `(let [acc# (~rf ~acc ~@(if (and (pair? body-expr) (nil? (meta body-expr))) - body-expr - [body-expr]))] - (if (reduced? acc#) - (-> acc# ~@nested-reduceds) - acc#)))] - `(fn [~rf] - (let [~rf (ensure-kvrf ~rf)] - (kvrf - ([] (~rf)) - ([~acc] (~rf ~acc)) - ([~acc ~binding] ~body) - ~(if (destructuring-pair? binding) - `([~acc ~@binding] ~body) - `([~acc k# v#] - (let [~binding (clojure.lang.MapEntry. k# v#)] ~body))))))))) - -(defprotocol KvRfable "Protocol for reducing fns that accept key and val as separate arguments." - (some-kvrf [f] "Returns a kvrf or nil")) - -(extend-protocol KvRfable - Object (some-kvrf [_] nil) - nil (some-kvrf [_] nil)) + [[binding %or_ & seq-exprs] body-expr] + (if-not (and (symbol? %or_) (#{"%" "_"} (name %or_))) + `(eduction (for [~binding ~'% ~@seq-exprs] ~body-expr) ~%or_) + (let [rf (gensym 'rf) + acc (gensym 'acc) + pair? #(and (vector? %) (= 2 (core/count %))) + destructuring-pair? (every-pred pair? + #(not-any? (some-fn keyword? #{'&}) %)) + rpairs (core/partition 2 (rseq (vec seq-exprs))) + build (fn [init] + (core/reduce (fn [body [expr binding]] + (case binding + :let `(let ~expr ~body) + :when `(if ~expr ~body ~acc) + :while `(if ~expr ~body (reduced ~acc)) + (if (destructuring-pair? binding) + `(let [expr# ~expr] + (if (and (map? expr#) (kvreducible? expr#)) + (core/reduce-kv (fn [~acc ~@binding] ~body) ~acc expr#) + (core/reduce (fn [~acc ~binding] ~body) ~acc expr#))) + `(core/reduce (fn [~acc ~binding] ~body) ~acc ~expr)))) + init rpairs)) + nested-reduceds (core/for [[expr binding] rpairs + :when (not (keyword? binding))] + `reduced) + body (build `(let [acc# (~rf ~acc ~@(if (and (pair? body-expr) (nil? (meta body-expr))) + body-expr + [body-expr]))] + (if (reduced? acc#) + (-> acc# ~@nested-reduceds) + acc#)))] + `(fn [~rf] + (let [~rf (ensure-kvrf ~rf)] + (kvrf + ([] (~rf)) + ([~acc] (~rf ~acc)) + ([~acc ~binding] ~body) + ~(if (destructuring-pair? binding) + `([~acc ~@binding] ~body) + `([~acc k# v#] + (let [~binding (macros/case :clj (clojure.lang.MapEntry. k# v#) :cljs [k# v#])] ~body))))))))) (defmacro kvrf [name? & fn-bodies] (let [name (if (symbol? name?) name? (gensym '_)) fn-bodies (if (symbol? name?) fn-bodies (cons name? fn-bodies)) fn-bodies (if (vector? (first fn-bodies)) (list fn-bodies) fn-bodies)] `(reify - clojure.lang.Fn + ~@(macros/case :clj '[clojure.lang.Fn]) KvRfable (some-kvrf [this#] this#) - clojure.lang.IFn - ~@(clj/for [[args & body] fn-bodies] + ~(macros/case :cljs `core/IFn :clj 'clojure.lang.IFn) + ~@(core/for [[args & body] fn-bodies] (let [nohint-args (map (fn [arg] (if (:tag (meta arg)) (gensym 'arg) arg)) args) rebind (mapcat (fn [arg nohint] (when-not (= arg nohint) [arg nohint])) args nohint-args)] - `(invoke [~name ~@nohint-args] (let [~@rebind] ~@body))))))) - -(defn ensure-kvrf [rf] - (or (some-kvrf rf) - (kvrf - ([] (rf)) - ([acc] (rf acc)) - ([acc x] (rf acc x)) - ([acc k v] (rf acc (clojure.lang.MapEntry. k v)))))) + `(~(macros/case :cljs `core/-invoke :clj 'invoke) [~name ~@nohint-args] (let [~@rebind] ~@body))))))) (defmacro ^:private let-complete [[binding volatile] & body] `(let [v# @~volatile] @@ -86,6 +77,29 @@ (vreset! ~volatile ~volatile) (let [~binding v#] ~@body)))) +) + +(declare into reduce multiplex by-key) + +(defprotocol KvRfable "Protocol for reducing fns that accept key and val as separate arguments." + (some-kvrf [f] "Returns a kvrf or nil")) + +(macros/usetime + +(defn kvreducible? [coll] + (satisfies? #?(:clj clojure.core.protocols/IKVReduce :cljs IKVReduce) coll)) + +(extend-protocol KvRfable + #?(:clj Object :cljs default) (some-kvrf [_] nil) + nil (some-kvrf [_] nil)) + +(defn ensure-kvrf [rf] + (or (some-kvrf rf) + (kvrf + ([] (rf)) + ([acc] (rf acc)) + ([acc x] (rf acc x)) + ([acc k v] (rf acc #?(:clj (clojure.lang.MapEntry. k v) :cljs [k v])))))) (defn reduce "A transducer that reduces a collection to a 1-item collection consisting of only the reduced result. @@ -111,7 +125,7 @@ (defn- into-rf [to] (cond - (instance? clojure.lang.IEditableCollection to) + (instance? #?(:clj clojure.lang.IEditableCollection :cljs IEditableCollection) to) (if (map? to) (kvrf ([] (transient to)) @@ -142,9 +156,9 @@ (into to identity from)) ([to xform from] (let [rf (xform (into-rf to))] - (if-let [rf (and (map? from) (satisfies? clojure.core.protocols/IKVReduce from) (some-kvrf rf))] - (rf (clj/reduce-kv rf (rf) from)) - (rf (clj/reduce rf (rf) from)))))) + (if-let [rf (and (map? from) (kvreducible? from) (some-kvrf rf))] + (rf (core/reduce-kv rf (rf) from)) + (rf (core/reduce rf (rf) from)))))) (defn minimum ([comparator] @@ -222,7 +236,7 @@ (if (and (nil? kfn) (nil? vfn)) (kvrf self ([] (rf)) - ([acc] (let-complete [m m] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (clj/vals (persistent! m)))))) + ([acc] (let-complete [m m] (rf (core/reduce (fn [acc krf] (krf acc)) acc (core/vals (persistent! m)))))) ([acc x] (self acc (key' x) (val' x))) ([acc k v] @@ -241,7 +255,7 @@ vfn (or vfn val')] (kvrf self ([] (rf)) - ([acc] (let-complete [m m] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (clj/vals (persistent! m)))))) + ([acc] (let-complete [m m] (rf (core/reduce (fn [acc krf] (krf acc)) acc (core/vals (persistent! m)))))) ([acc x] (let [k (kfn x) krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) @@ -255,7 +269,7 @@ (vswap! m assoc! k nop-rf) (krf @acc))) acc))) - ([acc k v] (self acc (clojure.lang.MapEntry. k v))))))))))) + ([acc k v] (self acc #?(:clj (clojure.lang.MapEntry. k v) :cljs [k v]))))))))))) (defn partition "Returns a partitioning transducer. Each partition is independently transformed using the xform transducer." @@ -282,7 +296,7 @@ (if (zero? b) ; this transduce may return a reduced because of mxrf wrapping reduceds coming from rf (let [acc (transduce xform mxrf acc dq)] - (dotimes [_ (clj/min n step)] (.poll dq)) + (dotimes [_ (core/min n step)] (.poll dq)) (vswap! barrier + step) acc) acc))))))) @@ -376,8 +390,9 @@ (vreset! vi (let [i (inc i)] (if (= n i) 0 i))) (rf acc (f (vreset! vwacc (f (invf wacc x') x)))))))))))) -(defn window-by-time - "Returns a transducer which computes a windowed accumulator over chronologically sorted items. +#?(:clj + (defn window-by-time + "Returns a transducer which computes a windowed accumulator over chronologically sorted items. timef is a function from one item to its scaled timestamp (as a double). The window length is always 1.0 so timef must normalize timestamps. For example if timestamps are in seconds (and under the :ts key), @@ -386,65 +401,65 @@ n is the integral number of steps by which the window slides. With a 1-hour window, 4 means that the window slides every 15 minutes. f and invf work like in #'window." - ([timef n f] - (window-by-time timef n - (fn - ([] clojure.lang.PersistentQueue/EMPTY) - ([q] (f (clj/reduce f (f) q))) - ([q x] (conj q x))) - (fn [q _] (pop q)))) - ([timef n f invf] - (let [timef (fn [x] (long (Math/floor (* n (timef x)))))] - (fn [rf] - (let [dq (java.util.ArrayDeque.) - vwacc (volatile! (f)) - flush! - (fn [acc ^long from-ts ^long to-ts] - (loop [ts from-ts acc acc wacc @vwacc] - (let [x (.peekFirst dq)] - (cond - (= ts (timef x)) - (do - (.pollFirst dq) - (recur ts acc (invf wacc x))) - (= ts to-ts) - (do - (vreset! vwacc wacc) - acc) - :else - (let [acc (rf acc (f wacc))] - (if (reduced? acc) + ([timef n f] + (window-by-time timef n + (fn + ([] clojure.lang.PersistentQueue/EMPTY) + ([q] (f (core/reduce f (f) q))) + ([q x] (conj q x))) + (fn [q _] (pop q)))) + ([timef n f invf] + (let [timef (fn [x] (long (Math/floor (* n (timef x)))))] + (fn [rf] + (let [dq (java.util.ArrayDeque.) + vwacc (volatile! (f)) + flush! + (fn [acc ^long from-ts ^long to-ts] + (loop [ts from-ts acc acc wacc @vwacc] + (let [x (.peekFirst dq)] + (cond + (= ts (timef x)) + (do + (.pollFirst dq) + (recur ts acc (invf wacc x))) + (= ts to-ts) (do (vreset! vwacc wacc) acc) - (recur (inc ts) acc wacc)))))))] - (fn - ([] (rf)) - ([acc] - (let [acc (if-not (.isEmpty dq) - (unreduced (rf acc (f @vwacc))) - acc)] - (rf acc))) - ([acc x] - (let [limit (- (timef x) n) - prev-limit (if-some [prev-x (.peekLast dq)] - (- (timef prev-x) n) - limit) - _ (.addLast dq x) ; so dq is never empty for flush! - acc (flush! acc prev-limit limit)] - (when-not (reduced? acc) - (vswap! vwacc f x)) - acc)))))))) + :else + (let [acc (rf acc (f wacc))] + (if (reduced? acc) + (do + (vreset! vwacc wacc) + acc) + (recur (inc ts) acc wacc)))))))] + (fn + ([] (rf)) + ([acc] + (let [acc (if-not (.isEmpty dq) + (unreduced (rf acc (f @vwacc))) + acc)] + (rf acc))) + ([acc x] + (let [limit (- (timef x) n) + prev-limit (if-some [prev-x (.peekLast dq)] + (- (timef prev-x) n) + limit) + _ (.addLast dq x) ; so dq is never empty for flush! + acc (flush! acc prev-limit limit)] + (when-not (reduced? acc) + (vswap! vwacc f x)) + acc))))))))) (defn count "Count the number of items. Either used directly as a transducer or invoked with two args as a transducing context." ([rf] - (let [n (java.util.concurrent.atomic.AtomicLong.)] + (let [n #?(:clj (java.util.concurrent.atomic.AtomicLong.) :cljs (atom 0))] (fn ([] (rf)) - ([acc] (rf (unreduced (rf acc (.get n))))) - ([acc _] (.incrementAndGet n) acc)))) + ([acc] (rf (unreduced (rf acc #?(:clj (.get n) :cljs @n))))) + ([acc _] #?(:clj (.incrementAndGet n) :cljs (swap! n inc)) acc)))) ([xform coll] (transduce (comp xform count) rf/last coll))) @@ -479,7 +494,7 @@ acc))) acc @rfs)) (fn [acc invoke] - (clj/reduce + (core/reduce (fn [acc rf] (let [acc (invoke rf acc)] (if (reduced? acc) @@ -495,12 +510,12 @@ ([acc] (rf (invoke-rfs acc #(%1 %2)))) ([acc x] (let [acc (invoke-rfs acc #(%1 %2 x))] - (if (zero? (clj/count @rfs)) + (if (zero? (core/count @rfs)) (ensure-reduced acc) acc))) ([acc k v] (let [acc (invoke-rfs acc #(%1 %2 k v))] - (if (zero? (clj/count @rfs)) + (if (zero? (core/count @rfs)) (ensure-reduced acc) acc))))))) @@ -515,8 +530,8 @@ (let [collect-xform (if (map? xforms-map) (into {}) (reduce (kvrf - ([] (clj/reduce (fn [v _] (conj! v nil)) - (transient []) (range (clj/count xforms-map)))) + ([] (core/reduce (fn [v _] (conj! v nil)) + (transient []) (range (core/count xforms-map)))) ([v] (persistent! v)) ([v i x] (assoc! v i x))))) xforms-map (if (map? xforms-map) xforms-map (zipmap (range) xforms-map))] @@ -525,3 +540,5 @@ collect-xform))) ([xforms-map coll] (transduce (transjuxt xforms-map) rf/last coll))) + +) diff --git a/src/net/cgrand/xforms/rfs.clj b/src/net/cgrand/xforms/rfs.cljc similarity index 58% rename from src/net/cgrand/xforms/rfs.clj rename to src/net/cgrand/xforms/rfs.cljc index 52b459e..4d43b4f 100644 --- a/src/net/cgrand/xforms/rfs.clj +++ b/src/net/cgrand/xforms/rfs.cljc @@ -1,18 +1,33 @@ (ns net.cgrand.xforms.rfs {:author "Christophe Grand"} (:refer-clojure :exclude [str last min max]) - (:require [clojure.core :as clj])) + #?(:cljs (:require-macros + [net.cgrand.macrovich :as macros] + [net.cgrand.xforms.rfs :refer [or-instance?]]) + :clj (:require [net.cgrand.macrovich :as macros])) + (:require [#?(:clj clojure.core :cljs cljs.core) :as core]) + #?(:cljs (:import [goog.string StringBuffer]))) +(macros/deftime + (defmacro ^:private or-instance? [class x y] + (let [xsym (gensym 'x_)] + `(let [~xsym ~x] + (if (instance? ~class ~xsym) ~(with-meta xsym {:tag class}) ~y))))) + +(declare str!) + +(macros/usetime + (defn minimum - ([comparator] - (minimum comparator nil)) - ([^java.util.Comparator comparator absolute-maximum] - (fn - ([] ::+∞) - ([x] (if (identical? ::+∞ x) - absolute-maximum - x)) - ([a b] (if (or (identical? ::+∞ a) (pos? (.compare comparator a b))) b a))))) + ([comparator] + (minimum comparator nil)) + ([^java.util.Comparator comparator absolute-maximum] + (fn + ([] ::+∞) + ([x] (if (identical? ::+∞ x) + absolute-maximum + x)) + ([a b] (if (or (identical? ::+∞ a) (pos? (.compare comparator a b))) b a))))) (defn maximum ([comparator] @@ -43,20 +58,15 @@ ([x] x) ([_ x] x)) -(defmacro ^:private or-instance? [class x y] - (let [xsym (gensym 'x_)] - `(let [~xsym ~x] - (if (instance? ~class ~xsym) ~(with-meta xsym {:tag class}) ~y)))) - (defn str! "Like xforms/str but returns a StringBuilder." - ([] (StringBuilder.)) - ([sb] (or-instance? StringBuilder sb (StringBuilder. (clj/str sb)))) ; the instance? checks are for compatibility with str in case of seeded reduce/transduce. - ([sb x] (.append (or-instance? StringBuilder sb (StringBuilder. (clj/str sb))) x))) + ([] (#?(:clj StringBuilder. :cljs StringBuffer.))) + ([sb] (or-instance? #?(:clj StringBuilder :cljs StringBuffer) sb (#?(:clj StringBuilder. :cljs StringBuffer.) (core/str sb)))) ; the instance? checks are for compatibility with str in case of seeded reduce/transduce. + ([sb x] (.append (or-instance? #?(:clj StringBuilder :cljs StringBuffer) sb (#?(:clj StringBuilder. :cljs StringBuffer.) (core/str sb))) x))) (def str "Reducing function to build strings in linear time. Acts as replacement for clojure.core/str in a reduce/transduce call." - (completing str! clj/str)) + (completing str! core/str)) #_(defn juxt "Returns a reducing fn which compute all rfns at once and whose final return @@ -67,12 +77,12 @@ ([] (mapv #(vector % (volatile! (%))) rfns)) ([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc)) ([acc x] - (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]] + (let [some-unreduced (core/reduce (fn [some-unreduced [rf vacc]] (when-not (reduced? @vacc) (vswap! vacc rf x) true)) false acc)] (if some-unreduced acc (reduced acc)))) ([acc k v] - (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]] + (let [some-unreduced (core/reduce (fn [some-unreduced [rf vacc]] (when-not (reduced? @vacc) (vswap! vacc rf k v) true)) false acc)] (if some-unreduced acc (reduced acc))))))) @@ -86,4 +96,5 @@ ([] (f)) ([acc] (zipmap keys (f acc))) ([acc x] (f acc x)) - ([acc k v] (f acc k v)))))) \ No newline at end of file + ([acc k v] (f acc k v)))))) +)