Add into, juxt-map, partition, pad and count. And fix a bug in juxt.

This commit is contained in:
Christophe Grand 2015-09-04 11:20:35 +02:00
parent 62e384ec43
commit 24a11ea7e2
2 changed files with 103 additions and 5 deletions

View file

@ -2,6 +2,10 @@
More transducers and reducing functions for Clojure!
Transducers: `reduce`, `into`, `by-key`, `partition`, `pad` and `for`.
Reducing functions: `str`, `str!`, `avg`, `count`, `juxt`, `juxt-map`.
## Usage
Add this dependency to your project:
@ -33,7 +37,7 @@ Add this dependency to your project:
;; let's go transient!
(defn my-group-by [kfn coll]
(into {} (x/by-key kfn (x/reduce (completing conj! persistent!))) coll))
(into {} (x/by-key kfn (x/into [])) coll))
=> (quick-bench (group-by odd? (range 256)))
Execution time mean : 29,356531 µs
@ -41,10 +45,28 @@ Add this dependency to your project:
Execution time mean : 20,604297 µs
```
`avg` is a reducing fn to compute the arithmetic mean. `juxt` is used to compute several reducing fns at once.
Like `by-key`, `partition` also takes a transducer as an argument to allow further computation on the partition without buffering.
```clj
=> (sequence (x/partition 4 (x/reduce +)) (range 16))
(6 22 38 54)
```
Padding can be achieved using the `pad` function:
```clj
=> (sequence (x/partition 4 (comp (x/pad 4 (repeat :pad)) (x/into []))) (range 9))
([0 1 2 3] [4 5 6 7] [8 :pad :pad :pad])
```
`avg` is a reducing fn to compute the arithmetic mean. `juxt` and `juxt-map` are used to compute several reducing fns at once.
```clj
=> (into {} (x/by-key odd? (x/reduce (x/juxt + x/avg))) (range 256))
{false [16256 127], true [16384 128]}
=> (into {} (x/by-key odd? (x/reduce (x/juxt-map :sum + :mean x/avg :count x/count))) (range 256))
{false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}}
```
## License

View file

@ -1,7 +1,7 @@
(ns net.cgrand.xforms
"Extra transducers for Clojure"
{:author "Christophe Grand"}
(:refer-clojure :exclude [reduce for partition str juxt])
(:refer-clojure :exclude [reduce into count for partition str juxt])
(:require [clojure.core :as clj]))
(defmacro for
@ -42,6 +42,19 @@
([f init]
(reduce (fn ([] init) ([acc] (f acc)) ([acc x] (f acc x))))))
(defn into
"Returns a transducer which accumulate every input in a collection and outputs only the accumulated collection."
[coll]
(reduce (if (instance? clojure.lang.IEditableCollection coll)
(fn
([] (transient coll))
([acc] (persistent! acc))
([acc x] (conj! acc x)))
(fn
([] coll)
([acc] acc)
([acc x] (conj acc x))))))
(defmacro ^:private or-instance? [class x y]
(let [xsym (gensym 'x_)]
`(let [~xsym ~x]
@ -89,6 +102,58 @@
(vswap! m assoc! k noprf))
(unreduced acc))))))))
(defn- spawn
"Every n items, spawns a new pipeline."
[n xform]
(fn [rf]
(let [ncrf (fn ([]) ([acc] acc) ([acc x] (rf acc x))) ; no init no complete rf
vrfs (volatile! [])
m (volatile! 0)]
(fn
([] (rf))
([acc]
(rf (clj/reduce #(%2 %1) acc @vrfs)))
([acc x]
(let [rfs @vrfs
step! (fn [acc rf]
(let [acc (rf acc x)]
(if (reduced? acc)
(rf (unreduced acc))
(do
(vswap! vrfs conj! rf)
acc))))]
(vreset! vrfs (transient []))
(let [acc (clj/reduce step! acc rfs)
acc (if (neg? (vswap! m dec))
(do
(step! acc (xform ncrf))
(vswap! m + n))
acc)]
(vswap! vrfs persistent!)
acc)))))))
(defn pad [n padding-coll]
(fn [rf]
(let [n (volatile! n)]
(fn
([] (rf))
([acc]
(rf (clj/reduce ((take @n) rf) acc padding-coll)))
([acc x]
(vswap! n dec)
(rf acc x))))))
(defn partition
"Returns a partitioning transducer. Each partition is independently transformed using the xform transducer.
Unlike clojure.core/partition the last partitions may be incomplete.
Partitions can be padded using #'pad."
; being strict towards partition size implies buffering and avoiding unecessary buffering is part of this
; library goal. So partition won't support it. However a buffer transducer may be an option.
([n xform]
(partition n n xform))
([n step xform]
(spawn step (comp (take n) xform))))
(defn avg
"Reducing fn to compute the arithmetic mean."
([]
@ -103,6 +168,8 @@
([acc] (acc))
([acc x] (acc x)))
(defn count ([] 0) ([n] n) ([n _] (inc n)))
(defn juxt
"Returns a reducing fn which compute all rfns at once and whose final return
value is a vector of the final return values of each rfns."
@ -112,8 +179,17 @@
([] (mapv #(vector % (volatile! (%))) rfns))
([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
([acc x]
(let [some-unreduced (reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf x) true))
(let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf x) true))
false acc)]
(if some-unreduced acc (reduced acc)))))))
(defn juxt-map
[& key-rfns]
(let [f (apply juxt (take-nth 2 (next key-rfns)))
keys (vec (take-nth 2 key-rfns))]
(fn
([] (f))
([acc] (zipmap keys (f acc)))
([acc x] (f acc x)))))