WIP cljc conversion, basics (like (x/into {} (x/by-key odd? (x/reduce +)) (range 10))) work

This commit is contained in:
Christophe Grand 2016-12-15 17:39:28 +01:00
parent 3190a96041
commit bae1a9d1ad
3 changed files with 179 additions and 149 deletions

View file

@ -3,4 +3,6 @@
#_#_:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.7.0"]])
:dependencies [[org.clojure/clojure "1.8.0"]
[org.clojure/clojurescript "1.9.369"]
[net.cgrand/macrovich "0.1.0"]])

View file

@ -1,84 +1,75 @@
(ns net.cgrand.xforms
"Extra transducers for Clojure"
{:author "Christophe Grand"}
#?(:cljs (:require-macros
[net.cgrand.macrovich :as macros]
[net.cgrand.xforms :refer [for kvrf let-complete]])
:clj (:require [net.cgrand.macrovich :as macros]))
(:refer-clojure :exclude [reduce reductions into count for partition str last keys vals min max])
(:require [clojure.core :as clj]
(:require [#?(:clj clojure.core :cljs cljs.core) :as core]
[net.cgrand.xforms.rfs :as rf]))
(macros/deftime
(defmacro for
"Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer.
"Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer.
When the first expression is not % (or _) returns an eduction."
[[binding %or_ & seq-exprs] body-expr]
(if-not (and (symbol? %or_) (#{"%" "_"} (name %or_)))
`(eduction (for [~binding ~'% ~@seq-exprs] ~body-expr) ~%or_)
(let [rf (gensym 'rf)
acc (gensym 'acc)
pair? #(and (vector? %) (= 2 (clj/count %)))
destructuring-pair? (every-pred pair?
#(not-any? (some-fn keyword? #{'&}) %))
rpairs (clj/partition 2 (rseq (vec seq-exprs)))
build (fn [init]
(clj/reduce (fn [body [expr binding]]
(case binding
:let `(let ~expr ~body)
:when `(if ~expr ~body ~acc)
:while `(if ~expr ~body (reduced ~acc))
(if (destructuring-pair? binding)
`(let [expr# ~expr]
(if (and (map? expr#) (satisfies? clojure.core.protocols/IKVReduce expr#))
(clj/reduce-kv (fn [~acc ~@binding] ~body) ~acc expr#)
(clj/reduce (fn [~acc ~binding] ~body) ~acc expr#)))
`(clj/reduce (fn [~acc ~binding] ~body) ~acc ~expr))))
init rpairs))
nested-reduceds (clj/for [[expr binding] rpairs
:when (not (keyword? binding))]
`reduced)
body (build `(let [acc# (~rf ~acc ~@(if (and (pair? body-expr) (nil? (meta body-expr)))
body-expr
[body-expr]))]
(if (reduced? acc#)
(-> acc# ~@nested-reduceds)
acc#)))]
`(fn [~rf]
(let [~rf (ensure-kvrf ~rf)]
(kvrf
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~body)
~(if (destructuring-pair? binding)
`([~acc ~@binding] ~body)
`([~acc k# v#]
(let [~binding (clojure.lang.MapEntry. k# v#)] ~body)))))))))
(defprotocol KvRfable "Protocol for reducing fns that accept key and val as separate arguments."
(some-kvrf [f] "Returns a kvrf or nil"))
(extend-protocol KvRfable
Object (some-kvrf [_] nil)
nil (some-kvrf [_] nil))
[[binding %or_ & seq-exprs] body-expr]
(if-not (and (symbol? %or_) (#{"%" "_"} (name %or_)))
`(eduction (for [~binding ~'% ~@seq-exprs] ~body-expr) ~%or_)
(let [rf (gensym 'rf)
acc (gensym 'acc)
pair? #(and (vector? %) (= 2 (core/count %)))
destructuring-pair? (every-pred pair?
#(not-any? (some-fn keyword? #{'&}) %))
rpairs (core/partition 2 (rseq (vec seq-exprs)))
build (fn [init]
(core/reduce (fn [body [expr binding]]
(case binding
:let `(let ~expr ~body)
:when `(if ~expr ~body ~acc)
:while `(if ~expr ~body (reduced ~acc))
(if (destructuring-pair? binding)
`(let [expr# ~expr]
(if (and (map? expr#) (kvreducible? expr#))
(core/reduce-kv (fn [~acc ~@binding] ~body) ~acc expr#)
(core/reduce (fn [~acc ~binding] ~body) ~acc expr#)))
`(core/reduce (fn [~acc ~binding] ~body) ~acc ~expr))))
init rpairs))
nested-reduceds (core/for [[expr binding] rpairs
:when (not (keyword? binding))]
`reduced)
body (build `(let [acc# (~rf ~acc ~@(if (and (pair? body-expr) (nil? (meta body-expr)))
body-expr
[body-expr]))]
(if (reduced? acc#)
(-> acc# ~@nested-reduceds)
acc#)))]
`(fn [~rf]
(let [~rf (ensure-kvrf ~rf)]
(kvrf
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~body)
~(if (destructuring-pair? binding)
`([~acc ~@binding] ~body)
`([~acc k# v#]
(let [~binding (macros/case :clj (clojure.lang.MapEntry. k# v#) :cljs [k# v#])] ~body)))))))))
(defmacro kvrf [name? & fn-bodies]
(let [name (if (symbol? name?) name? (gensym '_))
fn-bodies (if (symbol? name?) fn-bodies (cons name? fn-bodies))
fn-bodies (if (vector? (first fn-bodies)) (list fn-bodies) fn-bodies)]
`(reify
clojure.lang.Fn
~@(macros/case :clj '[clojure.lang.Fn])
KvRfable
(some-kvrf [this#] this#)
clojure.lang.IFn
~@(clj/for [[args & body] fn-bodies]
~(macros/case :cljs `core/IFn :clj 'clojure.lang.IFn)
~@(core/for [[args & body] fn-bodies]
(let [nohint-args (map (fn [arg] (if (:tag (meta arg)) (gensym 'arg) arg)) args)
rebind (mapcat (fn [arg nohint]
(when-not (= arg nohint) [arg nohint])) args nohint-args)]
`(invoke [~name ~@nohint-args] (let [~@rebind] ~@body)))))))
(defn ensure-kvrf [rf]
(or (some-kvrf rf)
(kvrf
([] (rf))
([acc] (rf acc))
([acc x] (rf acc x))
([acc k v] (rf acc (clojure.lang.MapEntry. k v))))))
`(~(macros/case :cljs `core/-invoke :clj 'invoke) [~name ~@nohint-args] (let [~@rebind] ~@body)))))))
(defmacro ^:private let-complete [[binding volatile] & body]
`(let [v# @~volatile]
@ -86,6 +77,29 @@
(vreset! ~volatile ~volatile)
(let [~binding v#]
~@body))))
)
(declare into reduce multiplex by-key)
(defprotocol KvRfable "Protocol for reducing fns that accept key and val as separate arguments."
(some-kvrf [f] "Returns a kvrf or nil"))
(macros/usetime
(defn kvreducible? [coll]
(satisfies? #?(:clj clojure.core.protocols/IKVReduce :cljs IKVReduce) coll))
(extend-protocol KvRfable
#?(:clj Object :cljs default) (some-kvrf [_] nil)
nil (some-kvrf [_] nil))
(defn ensure-kvrf [rf]
(or (some-kvrf rf)
(kvrf
([] (rf))
([acc] (rf acc))
([acc x] (rf acc x))
([acc k v] (rf acc #?(:clj (clojure.lang.MapEntry. k v) :cljs [k v]))))))
(defn reduce
"A transducer that reduces a collection to a 1-item collection consisting of only the reduced result.
@ -111,7 +125,7 @@
(defn- into-rf [to]
(cond
(instance? clojure.lang.IEditableCollection to)
(instance? #?(:clj clojure.lang.IEditableCollection :cljs IEditableCollection) to)
(if (map? to)
(kvrf
([] (transient to))
@ -142,9 +156,9 @@
(into to identity from))
([to xform from]
(let [rf (xform (into-rf to))]
(if-let [rf (and (map? from) (satisfies? clojure.core.protocols/IKVReduce from) (some-kvrf rf))]
(rf (clj/reduce-kv rf (rf) from))
(rf (clj/reduce rf (rf) from))))))
(if-let [rf (and (map? from) (kvreducible? from) (some-kvrf rf))]
(rf (core/reduce-kv rf (rf) from))
(rf (core/reduce rf (rf) from))))))
(defn minimum
([comparator]
@ -222,7 +236,7 @@
(if (and (nil? kfn) (nil? vfn))
(kvrf self
([] (rf))
([acc] (let-complete [m m] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (clj/vals (persistent! m))))))
([acc] (let-complete [m m] (rf (core/reduce (fn [acc krf] (krf acc)) acc (core/vals (persistent! m))))))
([acc x]
(self acc (key' x) (val' x)))
([acc k v]
@ -241,7 +255,7 @@
vfn (or vfn val')]
(kvrf self
([] (rf))
([acc] (let-complete [m m] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (clj/vals (persistent! m))))))
([acc] (let-complete [m m] (rf (core/reduce (fn [acc krf] (krf acc)) acc (core/vals (persistent! m))))))
([acc x]
(let [k (kfn x)
krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
@ -255,7 +269,7 @@
(vswap! m assoc! k nop-rf)
(krf @acc)))
acc)))
([acc k v] (self acc (clojure.lang.MapEntry. k v)))))))))))
([acc k v] (self acc #?(:clj (clojure.lang.MapEntry. k v) :cljs [k v])))))))))))
(defn partition
"Returns a partitioning transducer. Each partition is independently transformed using the xform transducer."
@ -282,7 +296,7 @@
(if (zero? b)
; this transduce may return a reduced because of mxrf wrapping reduceds coming from rf
(let [acc (transduce xform mxrf acc dq)]
(dotimes [_ (clj/min n step)] (.poll dq))
(dotimes [_ (core/min n step)] (.poll dq))
(vswap! barrier + step)
acc)
acc)))))))
@ -376,8 +390,9 @@
(vreset! vi (let [i (inc i)] (if (= n i) 0 i)))
(rf acc (f (vreset! vwacc (f (invf wacc x') x))))))))))))
(defn window-by-time
"Returns a transducer which computes a windowed accumulator over chronologically sorted items.
#?(:clj
(defn window-by-time
"Returns a transducer which computes a windowed accumulator over chronologically sorted items.
timef is a function from one item to its scaled timestamp (as a double). The window length is always 1.0
so timef must normalize timestamps. For example if timestamps are in seconds (and under the :ts key),
@ -386,65 +401,65 @@
n is the integral number of steps by which the window slides. With a 1-hour window, 4 means that the window slides every 15 minutes.
f and invf work like in #'window."
([timef n f]
(window-by-time timef n
(fn
([] clojure.lang.PersistentQueue/EMPTY)
([q] (f (clj/reduce f (f) q)))
([q x] (conj q x)))
(fn [q _] (pop q))))
([timef n f invf]
(let [timef (fn [x] (long (Math/floor (* n (timef x)))))]
(fn [rf]
(let [dq (java.util.ArrayDeque.)
vwacc (volatile! (f))
flush!
(fn [acc ^long from-ts ^long to-ts]
(loop [ts from-ts acc acc wacc @vwacc]
(let [x (.peekFirst dq)]
(cond
(= ts (timef x))
(do
(.pollFirst dq)
(recur ts acc (invf wacc x)))
(= ts to-ts)
(do
(vreset! vwacc wacc)
acc)
:else
(let [acc (rf acc (f wacc))]
(if (reduced? acc)
([timef n f]
(window-by-time timef n
(fn
([] clojure.lang.PersistentQueue/EMPTY)
([q] (f (core/reduce f (f) q)))
([q x] (conj q x)))
(fn [q _] (pop q))))
([timef n f invf]
(let [timef (fn [x] (long (Math/floor (* n (timef x)))))]
(fn [rf]
(let [dq (java.util.ArrayDeque.)
vwacc (volatile! (f))
flush!
(fn [acc ^long from-ts ^long to-ts]
(loop [ts from-ts acc acc wacc @vwacc]
(let [x (.peekFirst dq)]
(cond
(= ts (timef x))
(do
(.pollFirst dq)
(recur ts acc (invf wacc x)))
(= ts to-ts)
(do
(vreset! vwacc wacc)
acc)
(recur (inc ts) acc wacc)))))))]
(fn
([] (rf))
([acc]
(let [acc (if-not (.isEmpty dq)
(unreduced (rf acc (f @vwacc)))
acc)]
(rf acc)))
([acc x]
(let [limit (- (timef x) n)
prev-limit (if-some [prev-x (.peekLast dq)]
(- (timef prev-x) n)
limit)
_ (.addLast dq x) ; so dq is never empty for flush!
acc (flush! acc prev-limit limit)]
(when-not (reduced? acc)
(vswap! vwacc f x))
acc))))))))
:else
(let [acc (rf acc (f wacc))]
(if (reduced? acc)
(do
(vreset! vwacc wacc)
acc)
(recur (inc ts) acc wacc)))))))]
(fn
([] (rf))
([acc]
(let [acc (if-not (.isEmpty dq)
(unreduced (rf acc (f @vwacc)))
acc)]
(rf acc)))
([acc x]
(let [limit (- (timef x) n)
prev-limit (if-some [prev-x (.peekLast dq)]
(- (timef prev-x) n)
limit)
_ (.addLast dq x) ; so dq is never empty for flush!
acc (flush! acc prev-limit limit)]
(when-not (reduced? acc)
(vswap! vwacc f x))
acc)))))))))
(defn count
"Count the number of items. Either used directly as a transducer or invoked with two args
as a transducing context."
([rf]
(let [n (java.util.concurrent.atomic.AtomicLong.)]
(let [n #?(:clj (java.util.concurrent.atomic.AtomicLong.) :cljs (atom 0))]
(fn
([] (rf))
([acc] (rf (unreduced (rf acc (.get n)))))
([acc _] (.incrementAndGet n) acc))))
([acc] (rf (unreduced (rf acc #?(:clj (.get n) :cljs @n)))))
([acc _] #?(:clj (.incrementAndGet n) :cljs (swap! n inc)) acc))))
([xform coll]
(transduce (comp xform count) rf/last coll)))
@ -479,7 +494,7 @@
acc)))
acc @rfs))
(fn [acc invoke]
(clj/reduce
(core/reduce
(fn [acc rf]
(let [acc (invoke rf acc)]
(if (reduced? acc)
@ -495,12 +510,12 @@
([acc] (rf (invoke-rfs acc #(%1 %2))))
([acc x]
(let [acc (invoke-rfs acc #(%1 %2 x))]
(if (zero? (clj/count @rfs))
(if (zero? (core/count @rfs))
(ensure-reduced acc)
acc)))
([acc k v]
(let [acc (invoke-rfs acc #(%1 %2 k v))]
(if (zero? (clj/count @rfs))
(if (zero? (core/count @rfs))
(ensure-reduced acc)
acc)))))))
@ -515,8 +530,8 @@
(let [collect-xform (if (map? xforms-map)
(into {})
(reduce (kvrf
([] (clj/reduce (fn [v _] (conj! v nil))
(transient []) (range (clj/count xforms-map))))
([] (core/reduce (fn [v _] (conj! v nil))
(transient []) (range (core/count xforms-map))))
([v] (persistent! v))
([v i x] (assoc! v i x)))))
xforms-map (if (map? xforms-map) xforms-map (zipmap (range) xforms-map))]
@ -525,3 +540,5 @@
collect-xform)))
([xforms-map coll]
(transduce (transjuxt xforms-map) rf/last coll)))
)

View file

@ -1,18 +1,33 @@
(ns net.cgrand.xforms.rfs
{:author "Christophe Grand"}
(:refer-clojure :exclude [str last min max])
(:require [clojure.core :as clj]))
#?(:cljs (:require-macros
[net.cgrand.macrovich :as macros]
[net.cgrand.xforms.rfs :refer [or-instance?]])
:clj (:require [net.cgrand.macrovich :as macros]))
(:require [#?(:clj clojure.core :cljs cljs.core) :as core])
#?(:cljs (:import [goog.string StringBuffer])))
(macros/deftime
(defmacro ^:private or-instance? [class x y]
(let [xsym (gensym 'x_)]
`(let [~xsym ~x]
(if (instance? ~class ~xsym) ~(with-meta xsym {:tag class}) ~y)))))
(declare str!)
(macros/usetime
(defn minimum
([comparator]
(minimum comparator nil))
([^java.util.Comparator comparator absolute-maximum]
(fn
([] ::+∞)
([x] (if (identical? ::+∞ x)
absolute-maximum
x))
([a b] (if (or (identical? ::+∞ a) (pos? (.compare comparator a b))) b a)))))
([comparator]
(minimum comparator nil))
([^java.util.Comparator comparator absolute-maximum]
(fn
([] ::+∞)
([x] (if (identical? ::+∞ x)
absolute-maximum
x))
([a b] (if (or (identical? ::+∞ a) (pos? (.compare comparator a b))) b a)))))
(defn maximum
([comparator]
@ -43,20 +58,15 @@
([x] x)
([_ x] x))
(defmacro ^:private or-instance? [class x y]
(let [xsym (gensym 'x_)]
`(let [~xsym ~x]
(if (instance? ~class ~xsym) ~(with-meta xsym {:tag class}) ~y))))
(defn str!
"Like xforms/str but returns a StringBuilder."
([] (StringBuilder.))
([sb] (or-instance? StringBuilder sb (StringBuilder. (clj/str sb)))) ; the instance? checks are for compatibility with str in case of seeded reduce/transduce.
([sb x] (.append (or-instance? StringBuilder sb (StringBuilder. (clj/str sb))) x)))
([] (#?(:clj StringBuilder. :cljs StringBuffer.)))
([sb] (or-instance? #?(:clj StringBuilder :cljs StringBuffer) sb (#?(:clj StringBuilder. :cljs StringBuffer.) (core/str sb)))) ; the instance? checks are for compatibility with str in case of seeded reduce/transduce.
([sb x] (.append (or-instance? #?(:clj StringBuilder :cljs StringBuffer) sb (#?(:clj StringBuilder. :cljs StringBuffer.) (core/str sb))) x)))
(def str
"Reducing function to build strings in linear time. Acts as replacement for clojure.core/str in a reduce/transduce call."
(completing str! clj/str))
(completing str! core/str))
#_(defn juxt
"Returns a reducing fn which compute all rfns at once and whose final return
@ -67,12 +77,12 @@
([] (mapv #(vector % (volatile! (%))) rfns))
([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
([acc x]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(let [some-unreduced (core/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf x) true))
false acc)]
(if some-unreduced acc (reduced acc))))
([acc k v]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(let [some-unreduced (core/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf k v) true))
false acc)]
(if some-unreduced acc (reduced acc)))))))
@ -86,4 +96,5 @@
([] (f))
([acc] (zipmap keys (f acc)))
([acc x] (f acc x))
([acc k v] (f acc k v))))))
([acc k v] (f acc k v))))))
)