From 8d240ed6ef76db2fc05707187aeade7b87634ef7 Mon Sep 17 00:00:00 2001 From: Christophe Grand Date: Thu, 3 Nov 2016 09:34:18 -0500 Subject: [PATCH] Several bug fixes, improved state cleanup on completion, move rfs to a separate namespace, new transducers: avg, min, minimum, max, maximum, last, str --- README.md | 16 ++-- project.clj | 2 +- src/net/cgrand/xforms.clj | 134 +++++++++++++++------------------- src/net/cgrand/xforms/rfs.clj | 89 ++++++++++++++++++++++ 4 files changed, 154 insertions(+), 87 deletions(-) create mode 100644 src/net/cgrand/xforms/rfs.clj diff --git a/README.md b/README.md index adf03bd..68d4226 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,9 @@ More transducers and reducing functions for Clojure! [![Build Status](https://travis-ci.org/cgrand/xforms.png?branch=master)](https://travis-ci.org/cgrand/xforms) -Transducers: `reduce`, `into`, `count`, `by-key`, `partition`, `for`, `multiplex`, `window` and `window-by-time`. +Transducers: `reduce`, `into`, `last`, `count`, `avg`, `min`, `minimum`, `max`, `maximum`, `str`, `by-key`, `partition`, `for`, `multiplex`, `transjuxt`, `window` and `window-by-time`. -Reducing functions: `str`, `str!`, `avg`, `juxt`, `juxt-map` and `last`. +Reducing functions (in `net.cgrand.xforms.rfs`): `min`, `minimum`, `max`, `maximum`, `str`, `str!`, `avg`, `juxt` and `last`. Transducing context: `transjuxt` (for performing several transductions in a single pass). @@ -15,7 +15,7 @@ Transducing context: `transjuxt` (for performing several transductions in a sing Add this dependency to your project: ```clj -[net.cgrand/xforms "0.4.0"] +[net.cgrand/xforms "0.5.0"] ``` ```clj @@ -27,7 +27,7 @@ Add this dependency to your project: ```clj => (quick-bench (reduce str (range 256))) Execution time mean : 58,714946 µs -=> (quick-bench (reduce x/str (range 256))) +=> (quick-bench (reduce rf/str (range 256))) Execution time mean : 11,609631 µs ``` @@ -72,12 +72,12 @@ Padding is achieved as usual: ``` -`avg` is a reducing fn to compute the arithmetic mean. `juxt` and `juxt-map` are used to compute several reducing fns at once. +`avg` is a transducer to compute the arithmetic mean. `transjuxt` is used to perform several transductions at once. ```clj -=> (into {} (x/by-key odd? (x/reduce (x/juxt + x/avg))) (range 256)) +=> (into {} (x/by-key odd? (x/transjuxt [(x/reduce +) x/avg])) (range 256)) {false [16256 127], true [16384 128]} -=> (into {} (x/by-key odd? (x/reduce (x/juxt-map :sum + :mean x/avg :count x/count))) (range 256)) +=> (into {} (x/by-key odd? (x/transjuxt {:sum (x/reduce +) :mean x/avg :count x/count})) (range 256)) {false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}} ``` @@ -163,8 +163,6 @@ Several xforms transducers and transducing contexts leverage `reduce-kv` and `kv 3-arg `into`
(transducing context)when `from` is a mapwhen `to` is a map `by-key`
(as a transducer)when is `kfn` and `vfn` are unspecified or `nil`when `pair` is `vector` or unspecified `by-key`
(as a transducing context on values)nono - `juxt`when at least one of the children `rfns` is a kvrfno - `juxt-map`when at least one of the children `rfns` is a kvrfno diff --git a/project.clj b/project.clj index 18ed9ac..2e85ce0 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject net.cgrand/xforms "0.4.0" +(defproject net.cgrand/xforms "0.5.0" :description "Extra transducers for Clojure" #_#_:url "http://example.com/FIXME" :license {:name "Eclipse Public License" diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj index 9f21bc7..3c6bca7 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.clj @@ -1,8 +1,9 @@ (ns net.cgrand.xforms "Extra transducers for Clojure" {:author "Christophe Grand"} - (:refer-clojure :exclude [reduce into count for partition str juxt last]) - (:require [clojure.core :as clj])) + (:refer-clojure :exclude [reduce into count for partition str last keys vals min max]) + (:require [clojure.core :as clj] + [net.cgrand.xforms.rfs :as rf])) (defmacro for "Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer." @@ -44,7 +45,7 @@ ([~acc] (~rf ~acc)) ([~acc ~binding] ~body) ~(if (destructuring-pair? binding) - `([~acc ~@binding] ~body) + `([~acc ~@(map #(vary-meta % dissoc :tag) binding)] ~body) `([~acc k# v#] (let [~binding (clojure.lang.MapEntry. k# v#)] ~body)))))))) @@ -75,6 +76,13 @@ ([acc x] (rf acc x)) ([acc k v] (rf acc (clojure.lang.MapEntry. k v)))))) +(defmacro ^:private let-complete [[binding volatile] & body] + `(let [v# @~volatile] + (when-not (identical? v# ~volatile) ; self reference as sentinel + (vreset! ~volatile ~volatile) + (let [~binding v#] + ~@body)))) + (defn reduce "A transducer that reduces a collection to a 1-item collection consisting of only the reduced result. Unlike reduce but like transduce it does call the completing arity (1) of the reducing fn." @@ -84,7 +92,8 @@ (let [f (ensure-kvrf f)] (kvrf ([] (rf)) - ([acc] (rf (unreduced (rf acc (f (unreduced @vacc)))))) + ([acc] (let-complete [f-acc vacc] + (rf (unreduced (rf acc (f (unreduced f-acc))))))) ([acc x] (if (reduced? (vswap! vacc f x)) (reduced acc) @@ -133,20 +142,35 @@ (rf (clj/reduce-kv rf (rf) from)) (rf (clj/reduce rf (rf) from)))))) -(defmacro ^:private or-instance? [class x y] - (let [xsym (gensym 'x_)] - `(let [~xsym ~x] - (if (instance? ~class ~xsym) ~(with-meta xsym {:tag class}) ~y)))) +(defn minimum + ([comparator] + (minimum comparator nil)) + ([comparator absolute-maximum] + (reduce (rf/minimum comparator absolute-maximum)))) -(defn str! - "Like xforms/str but returns a StringBuilder." - ([] (StringBuilder.)) - ([sb] (or-instance? StringBuilder sb (StringBuilder. (clj/str sb)))) ; the instance? checks are for compatibility with str in case of seeded reduce/transduce. - ([sb x] (.append (or-instance? StringBuilder sb (StringBuilder. (clj/str sb))) x))) +(defn maximum + ([comparator] + (maximum comparator nil)) + ([^java.util.Comparator comparator absolute-minimum] + (reduce (rf/maximum comparator absolute-minimum)))) -(def str - "Reducing function to build strings in linear time. Acts as replacement for clojure.core/str in a reduce/transduce call." - (completing str! clj/str)) +(def min (reduce rf/min)) + +(def max (reduce rf/max)) + +(defn vals [rf] + (kvrf + ([] (rf)) + ([acc] (rf acc)) + ([acc kv] (rf acc (val kv))) + ([acc k v] (rf acc v)))) + +(defn keys [rf] + (kvrf + ([] (rf)) + ([acc] (rf acc)) + ([acc kv] (rf acc (key kv))) + ([acc k v] (rf acc k)))) ;; for both map entries and vectors (defn- key' [kv] (nth kv 0)) @@ -194,7 +218,7 @@ (if (and (nil? kfn) (nil? vfn)) (kvrf self ([] (rf)) - ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) + ([acc] (let-complete [m m] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (clj/vals (persistent! m)))))) ([acc x] (self acc (key' x) (val' x))) ([acc k v] @@ -211,9 +235,9 @@ acc)))) (let [kfn (or kfn key') vfn (or vfn val')] - (fn + (kvrf self ([] (rf)) - ([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) + ([acc] (let-complete [m m] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (clj/vals (persistent! m)))))) ([acc x] (let [k (kfn x) krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) @@ -226,7 +250,8 @@ (do (vswap! m assoc! k nop-rf) (krf @acc))) - acc))))))))))) + acc))) + ([acc k v] (self acc (clojure.lang.MapEntry. k v))))))))))) (defn partition "Returns a partitioning transducer. Each partition is independently transformed using the xform transducer." @@ -246,7 +271,7 @@ xform (comp (map #(if (identical? dq %) nil %)) xform)] (fn ([] (rf)) - ([acc] (rf acc)) + ([acc] (.clear dq) (rf acc)) ([acc x] (let [b (vswap! barrier dec)] (when (< b n) (.add dq (if (nil? x) dq x))) @@ -267,9 +292,12 @@ (fn ([] (rf)) ([acc] (if (< @barrier n) - (let [xform (comp cat (take n) xform)] - ; don't use mxrf for completion: we want completion and don't want reduced-wrapping - (transduce xform rf acc [dq pad])) + (let [xform (comp cat (take n) xform) + ; don't use mxrf for completion: we want completion and don't want reduced-wrapping + acc (transduce xform rf acc [dq pad])] + (vreset! @barrier n) + (.clear dq) + acc) acc)) ([acc x] (let [b (vswap! barrier dec)] @@ -282,13 +310,7 @@ acc) acc)))))))) -(defn avg - "Reducing fn to compute the arithmetic mean." - ([] (transient [0 0])) - ([[n sum]] (/ sum n)) - ([acc x] (avg acc x 1)) - ([[n sum :as acc] x w] - (-> acc (assoc! 0 (+ n w)) (assoc! 1 (+ sum (* w x)))))) +(def avg (reduce rf/avg)) (defn window "Returns a transducer which computes an accumulator over the last n items @@ -393,25 +415,6 @@ ([acc] (rf (unreduced (rf acc (.get n))))) ([acc _] (.incrementAndGet n) acc)))) -(defn juxt - "Returns a reducing fn which compute all rfns at once and whose final return - value is a vector of the final return values of each rfns." - [& rfns] - (let [rfns (mapv ensure-kvrf rfns)] - (kvrf - ([] (mapv #(vector % (volatile! (%))) rfns)) - ([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc)) - ([acc x] - (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]] - (when-not (reduced? @vacc) (vswap! vacc rf x) true)) - false acc)] - (if some-unreduced acc (reduced acc)))) - ([acc k v] - (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]] - (when-not (reduced? @vacc) (vswap! vacc rf k v) true)) - false acc)] - (if some-unreduced acc (reduced acc))))))) - (defn multiplex "Returns a transducer that runs several transducers (sepcified by xforms) in parallel. If xforms is a map, values of the map are transducers and keys are used to tag each @@ -468,22 +471,7 @@ (ensure-reduced acc) acc))))))) -(defn juxt-map - [& key-rfns] - (let [f (apply juxt (take-nth 2 (next key-rfns))) - keys (vec (take-nth 2 key-rfns))] - (let [f (ensure-kvrf f)] - (kvrf - ([] (f)) - ([acc] (zipmap keys (f acc))) - ([acc x] (f acc x)) - ([acc k v] (f acc k v)))))) - -(defn last - "Reducing function that returns the last value." - ([] nil) - ([x] x) - ([_ x] x)) +(def last (reduce rf/last)) (defn transjuxt "Performs several transductions over coll at once. xforms-map can be a map or a sequential collection. @@ -495,20 +483,12 @@ (into {}) (reduce (kvrf ([] (clj/reduce (fn [v _] (conj! v nil)) - (transient []) (range (count xforms-map)))) + (transient []) (range (clj/count xforms-map)))) ([v] (persistent! v)) ([v i x] (assoc! v i x))))) - xforms-map (if (map? xforms-map) xforms-map (zipmap (range xforms-map)))] + xforms-map (if (map? xforms-map) xforms-map (zipmap (range) xforms-map))] (comp (multiplex (into {} (by-key (map #(comp % (take 1)))) xforms-map)) collect-xform))) ([xforms-map coll] - (transduce (transjuxt xforms-map) last coll))) - -;; map stuff -(defn update - ([m k xform] - (update m k xform nil)) - ([m k xform not-found] - (let [rf (xform (fn ([m] m) ([m v] (assoc m k v))))] - (rf (unreduced (rf (dissoc m k) (get m k not-found))))))) \ No newline at end of file + (transduce (transjuxt xforms-map) rf/last coll))) diff --git a/src/net/cgrand/xforms/rfs.clj b/src/net/cgrand/xforms/rfs.clj new file mode 100644 index 0000000..52b459e --- /dev/null +++ b/src/net/cgrand/xforms/rfs.clj @@ -0,0 +1,89 @@ +(ns net.cgrand.xforms.rfs + {:author "Christophe Grand"} + (:refer-clojure :exclude [str last min max]) + (:require [clojure.core :as clj])) + +(defn minimum + ([comparator] + (minimum comparator nil)) + ([^java.util.Comparator comparator absolute-maximum] + (fn + ([] ::+∞) + ([x] (if (identical? ::+∞ x) + absolute-maximum + x)) + ([a b] (if (or (identical? ::+∞ a) (pos? (.compare comparator a b))) b a))))) + +(defn maximum + ([comparator] + (maximum comparator nil)) + ([^java.util.Comparator comparator absolute-minimum] + (fn + ([] ::-∞) + ([x] (if (identical? ::-∞ x) + absolute-minimum + x)) + ([a b] (if (or (identical? ::-∞ a) (neg? (.compare comparator a b))) b a))))) + +(def min (minimum compare)) + +(def max (maximum compare)) + +(defn avg + "Reducing fn to compute the arithmetic mean." + ([] (transient [0 0])) + ([[n sum]] (/ sum n)) + ([acc x] (avg acc x 1)) + ([[n sum :as acc] x w] + (-> acc (assoc! 0 (+ n w)) (assoc! 1 (+ sum (* w x)))))) + +(defn last + "Reducing function that returns the last value." + ([] nil) + ([x] x) + ([_ x] x)) + +(defmacro ^:private or-instance? [class x y] + (let [xsym (gensym 'x_)] + `(let [~xsym ~x] + (if (instance? ~class ~xsym) ~(with-meta xsym {:tag class}) ~y)))) + +(defn str! + "Like xforms/str but returns a StringBuilder." + ([] (StringBuilder.)) + ([sb] (or-instance? StringBuilder sb (StringBuilder. (clj/str sb)))) ; the instance? checks are for compatibility with str in case of seeded reduce/transduce. + ([sb x] (.append (or-instance? StringBuilder sb (StringBuilder. (clj/str sb))) x))) + +(def str + "Reducing function to build strings in linear time. Acts as replacement for clojure.core/str in a reduce/transduce call." + (completing str! clj/str)) + +#_(defn juxt + "Returns a reducing fn which compute all rfns at once and whose final return + value is a vector of the final return values of each rfns." + [& rfns] + (let [rfns (mapv ensure-kvrf rfns)] + (kvrf + ([] (mapv #(vector % (volatile! (%))) rfns)) + ([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc)) + ([acc x] + (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]] + (when-not (reduced? @vacc) (vswap! vacc rf x) true)) + false acc)] + (if some-unreduced acc (reduced acc)))) + ([acc k v] + (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]] + (when-not (reduced? @vacc) (vswap! vacc rf k v) true)) + false acc)] + (if some-unreduced acc (reduced acc))))))) + +#_(defn juxt-map + [& key-rfns] + (let [f (apply juxt (take-nth 2 (next key-rfns))) + keys (vec (take-nth 2 key-rfns))] + (let [f (ensure-kvrf f)] + (kvrf + ([] (f)) + ([acc] (zipmap keys (f acc))) + ([acc x] (f acc x)) + ([acc k v] (f acc k v)))))) \ No newline at end of file