Several bug fixes, improved state cleanup on completion, move rfs to a separate namespace, new transducers: avg, min, minimum, max, maximum, last, str

This commit is contained in:
Christophe Grand 2016-11-03 09:34:18 -05:00
parent 596ee03918
commit 8d240ed6ef
4 changed files with 154 additions and 87 deletions

View file

@ -4,9 +4,9 @@ More transducers and reducing functions for Clojure!
[![Build Status](https://travis-ci.org/cgrand/xforms.png?branch=master)](https://travis-ci.org/cgrand/xforms) [![Build Status](https://travis-ci.org/cgrand/xforms.png?branch=master)](https://travis-ci.org/cgrand/xforms)
Transducers: `reduce`, `into`, `count`, `by-key`, `partition`, `for`, `multiplex`, `window` and `window-by-time`. Transducers: `reduce`, `into`, `last`, `count`, `avg`, `min`, `minimum`, `max`, `maximum`, `str`, `by-key`, `partition`, `for`, `multiplex`, `transjuxt`, `window` and `window-by-time`.
Reducing functions: `str`, `str!`, `avg`, `juxt`, `juxt-map` and `last`. Reducing functions (in `net.cgrand.xforms.rfs`): `min`, `minimum`, `max`, `maximum`, `str`, `str!`, `avg`, `juxt` and `last`.
Transducing context: `transjuxt` (for performing several transductions in a single pass). Transducing context: `transjuxt` (for performing several transductions in a single pass).
@ -15,7 +15,7 @@ Transducing context: `transjuxt` (for performing several transductions in a sing
Add this dependency to your project: Add this dependency to your project:
```clj ```clj
[net.cgrand/xforms "0.4.0"] [net.cgrand/xforms "0.5.0"]
``` ```
```clj ```clj
@ -27,7 +27,7 @@ Add this dependency to your project:
```clj ```clj
=> (quick-bench (reduce str (range 256))) => (quick-bench (reduce str (range 256)))
Execution time mean : 58,714946 µs Execution time mean : 58,714946 µs
=> (quick-bench (reduce x/str (range 256))) => (quick-bench (reduce rf/str (range 256)))
Execution time mean : 11,609631 µs Execution time mean : 11,609631 µs
``` ```
@ -72,12 +72,12 @@ Padding is achieved as usual:
``` ```
`avg` is a reducing fn to compute the arithmetic mean. `juxt` and `juxt-map` are used to compute several reducing fns at once. `avg` is a transducer to compute the arithmetic mean. `transjuxt` is used to perform several transductions at once.
```clj ```clj
=> (into {} (x/by-key odd? (x/reduce (x/juxt + x/avg))) (range 256)) => (into {} (x/by-key odd? (x/transjuxt [(x/reduce +) x/avg])) (range 256))
{false [16256 127], true [16384 128]} {false [16256 127], true [16384 128]}
=> (into {} (x/by-key odd? (x/reduce (x/juxt-map :sum + :mean x/avg :count x/count))) (range 256)) => (into {} (x/by-key odd? (x/transjuxt {:sum (x/reduce +) :mean x/avg :count x/count})) (range 256))
{false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}} {false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}}
``` ```
@ -163,8 +163,6 @@ Several xforms transducers and transducing contexts leverage `reduce-kv` and `kv
<tr><td>3-arg `into`<br>(transducing context)<td>when `from` is a map<td>when `to` is a map <tr><td>3-arg `into`<br>(transducing context)<td>when `from` is a map<td>when `to` is a map
<tr><td>`by-key`<br>(as a transducer)<td>when is `kfn` and `vfn` are unspecified or `nil`<td>when `pair` is `vector` or unspecified <tr><td>`by-key`<br>(as a transducer)<td>when is `kfn` and `vfn` are unspecified or `nil`<td>when `pair` is `vector` or unspecified
<tr><td>`by-key`<br>(as a transducing context on values)<td>no<td>no <tr><td>`by-key`<br>(as a transducing context on values)<td>no<td>no
<tr><td>`juxt`<td>when at least one of the children `rfns` is a kvrf<td>no
<tr><td>`juxt-map`<td>when at least one of the children `rfns` is a kvrf<td>no
</tbody> </tbody>
<table> <table>

View file

@ -1,4 +1,4 @@
(defproject net.cgrand/xforms "0.4.0" (defproject net.cgrand/xforms "0.5.0"
:description "Extra transducers for Clojure" :description "Extra transducers for Clojure"
#_#_:url "http://example.com/FIXME" #_#_:url "http://example.com/FIXME"
:license {:name "Eclipse Public License" :license {:name "Eclipse Public License"

View file

@ -1,8 +1,9 @@
(ns net.cgrand.xforms (ns net.cgrand.xforms
"Extra transducers for Clojure" "Extra transducers for Clojure"
{:author "Christophe Grand"} {:author "Christophe Grand"}
(:refer-clojure :exclude [reduce into count for partition str juxt last]) (:refer-clojure :exclude [reduce into count for partition str last keys vals min max])
(:require [clojure.core :as clj])) (:require [clojure.core :as clj]
[net.cgrand.xforms.rfs :as rf]))
(defmacro for (defmacro for
"Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer." "Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer."
@ -44,7 +45,7 @@
([~acc] (~rf ~acc)) ([~acc] (~rf ~acc))
([~acc ~binding] ~body) ([~acc ~binding] ~body)
~(if (destructuring-pair? binding) ~(if (destructuring-pair? binding)
`([~acc ~@binding] ~body) `([~acc ~@(map #(vary-meta % dissoc :tag) binding)] ~body)
`([~acc k# v#] `([~acc k# v#]
(let [~binding (clojure.lang.MapEntry. k# v#)] ~body)))))))) (let [~binding (clojure.lang.MapEntry. k# v#)] ~body))))))))
@ -75,6 +76,13 @@
([acc x] (rf acc x)) ([acc x] (rf acc x))
([acc k v] (rf acc (clojure.lang.MapEntry. k v)))))) ([acc k v] (rf acc (clojure.lang.MapEntry. k v))))))
(defmacro ^:private let-complete [[binding volatile] & body]
`(let [v# @~volatile]
(when-not (identical? v# ~volatile) ; self reference as sentinel
(vreset! ~volatile ~volatile)
(let [~binding v#]
~@body))))
(defn reduce (defn reduce
"A transducer that reduces a collection to a 1-item collection consisting of only the reduced result. "A transducer that reduces a collection to a 1-item collection consisting of only the reduced result.
Unlike reduce but like transduce it does call the completing arity (1) of the reducing fn." Unlike reduce but like transduce it does call the completing arity (1) of the reducing fn."
@ -84,7 +92,8 @@
(let [f (ensure-kvrf f)] (let [f (ensure-kvrf f)]
(kvrf (kvrf
([] (rf)) ([] (rf))
([acc] (rf (unreduced (rf acc (f (unreduced @vacc)))))) ([acc] (let-complete [f-acc vacc]
(rf (unreduced (rf acc (f (unreduced f-acc)))))))
([acc x] ([acc x]
(if (reduced? (vswap! vacc f x)) (if (reduced? (vswap! vacc f x))
(reduced acc) (reduced acc)
@ -133,20 +142,35 @@
(rf (clj/reduce-kv rf (rf) from)) (rf (clj/reduce-kv rf (rf) from))
(rf (clj/reduce rf (rf) from)))))) (rf (clj/reduce rf (rf) from))))))
(defmacro ^:private or-instance? [class x y] (defn minimum
(let [xsym (gensym 'x_)] ([comparator]
`(let [~xsym ~x] (minimum comparator nil))
(if (instance? ~class ~xsym) ~(with-meta xsym {:tag class}) ~y)))) ([comparator absolute-maximum]
(reduce (rf/minimum comparator absolute-maximum))))
(defn str! (defn maximum
"Like xforms/str but returns a StringBuilder." ([comparator]
([] (StringBuilder.)) (maximum comparator nil))
([sb] (or-instance? StringBuilder sb (StringBuilder. (clj/str sb)))) ; the instance? checks are for compatibility with str in case of seeded reduce/transduce. ([^java.util.Comparator comparator absolute-minimum]
([sb x] (.append (or-instance? StringBuilder sb (StringBuilder. (clj/str sb))) x))) (reduce (rf/maximum comparator absolute-minimum))))
(def str (def min (reduce rf/min))
"Reducing function to build strings in linear time. Acts as replacement for clojure.core/str in a reduce/transduce call."
(completing str! clj/str)) (def max (reduce rf/max))
(defn vals [rf]
(kvrf
([] (rf))
([acc] (rf acc))
([acc kv] (rf acc (val kv)))
([acc k v] (rf acc v))))
(defn keys [rf]
(kvrf
([] (rf))
([acc] (rf acc))
([acc kv] (rf acc (key kv)))
([acc k v] (rf acc k))))
;; for both map entries and vectors ;; for both map entries and vectors
(defn- key' [kv] (nth kv 0)) (defn- key' [kv] (nth kv 0))
@ -194,7 +218,7 @@
(if (and (nil? kfn) (nil? vfn)) (if (and (nil? kfn) (nil? vfn))
(kvrf self (kvrf self
([] (rf)) ([] (rf))
([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) ([acc] (let-complete [m m] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (clj/vals (persistent! m))))))
([acc x] ([acc x]
(self acc (key' x) (val' x))) (self acc (key' x) (val' x)))
([acc k v] ([acc k v]
@ -211,9 +235,9 @@
acc)))) acc))))
(let [kfn (or kfn key') (let [kfn (or kfn key')
vfn (or vfn val')] vfn (or vfn val')]
(fn (kvrf self
([] (rf)) ([] (rf))
([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))) ([acc] (let-complete [m m] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (clj/vals (persistent! m))))))
([acc x] ([acc x]
(let [k (kfn x) (let [k (kfn x)
krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
@ -226,7 +250,8 @@
(do (do
(vswap! m assoc! k nop-rf) (vswap! m assoc! k nop-rf)
(krf @acc))) (krf @acc)))
acc))))))))))) acc)))
([acc k v] (self acc (clojure.lang.MapEntry. k v)))))))))))
(defn partition (defn partition
"Returns a partitioning transducer. Each partition is independently transformed using the xform transducer." "Returns a partitioning transducer. Each partition is independently transformed using the xform transducer."
@ -246,7 +271,7 @@
xform (comp (map #(if (identical? dq %) nil %)) xform)] xform (comp (map #(if (identical? dq %) nil %)) xform)]
(fn (fn
([] (rf)) ([] (rf))
([acc] (rf acc)) ([acc] (.clear dq) (rf acc))
([acc x] ([acc x]
(let [b (vswap! barrier dec)] (let [b (vswap! barrier dec)]
(when (< b n) (.add dq (if (nil? x) dq x))) (when (< b n) (.add dq (if (nil? x) dq x)))
@ -267,9 +292,12 @@
(fn (fn
([] (rf)) ([] (rf))
([acc] (if (< @barrier n) ([acc] (if (< @barrier n)
(let [xform (comp cat (take n) xform)] (let [xform (comp cat (take n) xform)
; don't use mxrf for completion: we want completion and don't want reduced-wrapping ; don't use mxrf for completion: we want completion and don't want reduced-wrapping
(transduce xform rf acc [dq pad])) acc (transduce xform rf acc [dq pad])]
(vreset! @barrier n)
(.clear dq)
acc)
acc)) acc))
([acc x] ([acc x]
(let [b (vswap! barrier dec)] (let [b (vswap! barrier dec)]
@ -282,13 +310,7 @@
acc) acc)
acc)))))))) acc))))))))
(defn avg (def avg (reduce rf/avg))
"Reducing fn to compute the arithmetic mean."
([] (transient [0 0]))
([[n sum]] (/ sum n))
([acc x] (avg acc x 1))
([[n sum :as acc] x w]
(-> acc (assoc! 0 (+ n w)) (assoc! 1 (+ sum (* w x))))))
(defn window (defn window
"Returns a transducer which computes an accumulator over the last n items "Returns a transducer which computes an accumulator over the last n items
@ -393,25 +415,6 @@
([acc] (rf (unreduced (rf acc (.get n))))) ([acc] (rf (unreduced (rf acc (.get n)))))
([acc _] (.incrementAndGet n) acc)))) ([acc _] (.incrementAndGet n) acc))))
(defn juxt
"Returns a reducing fn which compute all rfns at once and whose final return
value is a vector of the final return values of each rfns."
[& rfns]
(let [rfns (mapv ensure-kvrf rfns)]
(kvrf
([] (mapv #(vector % (volatile! (%))) rfns))
([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
([acc x]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf x) true))
false acc)]
(if some-unreduced acc (reduced acc))))
([acc k v]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf k v) true))
false acc)]
(if some-unreduced acc (reduced acc)))))))
(defn multiplex (defn multiplex
"Returns a transducer that runs several transducers (sepcified by xforms) in parallel. "Returns a transducer that runs several transducers (sepcified by xforms) in parallel.
If xforms is a map, values of the map are transducers and keys are used to tag each If xforms is a map, values of the map are transducers and keys are used to tag each
@ -468,22 +471,7 @@
(ensure-reduced acc) (ensure-reduced acc)
acc))))))) acc)))))))
(defn juxt-map (def last (reduce rf/last))
[& key-rfns]
(let [f (apply juxt (take-nth 2 (next key-rfns)))
keys (vec (take-nth 2 key-rfns))]
(let [f (ensure-kvrf f)]
(kvrf
([] (f))
([acc] (zipmap keys (f acc)))
([acc x] (f acc x))
([acc k v] (f acc k v))))))
(defn last
"Reducing function that returns the last value."
([] nil)
([x] x)
([_ x] x))
(defn transjuxt (defn transjuxt
"Performs several transductions over coll at once. xforms-map can be a map or a sequential collection. "Performs several transductions over coll at once. xforms-map can be a map or a sequential collection.
@ -495,20 +483,12 @@
(into {}) (into {})
(reduce (kvrf (reduce (kvrf
([] (clj/reduce (fn [v _] (conj! v nil)) ([] (clj/reduce (fn [v _] (conj! v nil))
(transient []) (range (count xforms-map)))) (transient []) (range (clj/count xforms-map))))
([v] (persistent! v)) ([v] (persistent! v))
([v i x] (assoc! v i x))))) ([v i x] (assoc! v i x)))))
xforms-map (if (map? xforms-map) xforms-map (zipmap (range xforms-map)))] xforms-map (if (map? xforms-map) xforms-map (zipmap (range) xforms-map))]
(comp (comp
(multiplex (into {} (by-key (map #(comp % (take 1)))) xforms-map)) (multiplex (into {} (by-key (map #(comp % (take 1)))) xforms-map))
collect-xform))) collect-xform)))
([xforms-map coll] ([xforms-map coll]
(transduce (transjuxt xforms-map) last coll))) (transduce (transjuxt xforms-map) rf/last coll)))
;; map stuff
(defn update
([m k xform]
(update m k xform nil))
([m k xform not-found]
(let [rf (xform (fn ([m] m) ([m v] (assoc m k v))))]
(rf (unreduced (rf (dissoc m k) (get m k not-found)))))))

View file

@ -0,0 +1,89 @@
(ns net.cgrand.xforms.rfs
{:author "Christophe Grand"}
(:refer-clojure :exclude [str last min max])
(:require [clojure.core :as clj]))
(defn minimum
([comparator]
(minimum comparator nil))
([^java.util.Comparator comparator absolute-maximum]
(fn
([] ::+)
([x] (if (identical? ::+ x)
absolute-maximum
x))
([a b] (if (or (identical? ::+ a) (pos? (.compare comparator a b))) b a)))))
(defn maximum
([comparator]
(maximum comparator nil))
([^java.util.Comparator comparator absolute-minimum]
(fn
([] ::-)
([x] (if (identical? ::- x)
absolute-minimum
x))
([a b] (if (or (identical? ::- a) (neg? (.compare comparator a b))) b a)))))
(def min (minimum compare))
(def max (maximum compare))
(defn avg
"Reducing fn to compute the arithmetic mean."
([] (transient [0 0]))
([[n sum]] (/ sum n))
([acc x] (avg acc x 1))
([[n sum :as acc] x w]
(-> acc (assoc! 0 (+ n w)) (assoc! 1 (+ sum (* w x))))))
(defn last
"Reducing function that returns the last value."
([] nil)
([x] x)
([_ x] x))
(defmacro ^:private or-instance? [class x y]
(let [xsym (gensym 'x_)]
`(let [~xsym ~x]
(if (instance? ~class ~xsym) ~(with-meta xsym {:tag class}) ~y))))
(defn str!
"Like xforms/str but returns a StringBuilder."
([] (StringBuilder.))
([sb] (or-instance? StringBuilder sb (StringBuilder. (clj/str sb)))) ; the instance? checks are for compatibility with str in case of seeded reduce/transduce.
([sb x] (.append (or-instance? StringBuilder sb (StringBuilder. (clj/str sb))) x)))
(def str
"Reducing function to build strings in linear time. Acts as replacement for clojure.core/str in a reduce/transduce call."
(completing str! clj/str))
#_(defn juxt
"Returns a reducing fn which compute all rfns at once and whose final return
value is a vector of the final return values of each rfns."
[& rfns]
(let [rfns (mapv ensure-kvrf rfns)]
(kvrf
([] (mapv #(vector % (volatile! (%))) rfns))
([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
([acc x]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf x) true))
false acc)]
(if some-unreduced acc (reduced acc))))
([acc k v]
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf k v) true))
false acc)]
(if some-unreduced acc (reduced acc)))))))
#_(defn juxt-map
[& key-rfns]
(let [f (apply juxt (take-nth 2 (next key-rfns)))
keys (vec (take-nth 2 key-rfns))]
(let [f (ensure-kvrf f)]
(kvrf
([] (f))
([acc] (zipmap keys (f acc)))
([acc x] (f acc x))
([acc k v] (f acc k v))))))