Add the window transducer.

This commit is contained in:
Christophe Grand 2015-09-07 16:18:17 +02:00
parent 92e1eb4e4d
commit a759ca7e7a
2 changed files with 64 additions and 1 deletions

View file

@ -4,7 +4,7 @@ More transducers and reducing functions for Clojure!
[![Build Status](https://travis-ci.org/cgrand/xforms.png?branch=master)](https://travis-ci.org/cgrand/xforms)
Transducers: `reduce`, `into`, `by-key`, `partition`, `pad` and `for`.
Transducers: `reduce`, `into`, `by-key`, `partition`, `pad`, `for` and `window`.
Reducing functions: `str`, `str!`, `avg`, `count`, `juxt`, `juxt-map`.
@ -79,6 +79,36 @@ Padding can be achieved using the `pad` function:
{false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}}
```
`window` is a new transducer to efficiently compute a windowed accumulator:
```clj
;; sum of last 3 items
=> (sequence (x/window 3 + -) (range 16))
(0 1 3 6 9 12 15 18 21 24 27 30 33 36 39 42)
=> (def nums (repeatedly 8 #(rand-int 42)))
#'user/nums
=> nums
(11 8 32 26 6 10 37 24)
;; avg of last 4 items
=> (sequence
(x/window 4 x/avg #(x/avg % (- %2)))
nums)
(11 19/2 17 77/4 12 37/4 79/10 77/12)
;; min of last 3 items
=> (sequence
(x/window 3
(fn
([] (sorted-set))
([s] (first s))
([s x] (conj s x)))
disj)
nums)
(11 8 8 8 6 6 6 10)
```
## On Partitioning
Both `by-key` and `partition` takes a transducer as parameter. This transducer is used to further process each partition.

View file

@ -173,6 +173,39 @@
([acc] (acc))
([acc x] (acc x)))
(defn window
"Returns a transducer which computes an accumulator over the last n items
using two functions: f and its inverse invf.
The accumulator is initialized with (f).
It is updated to (f (invf acc out) in) where \"acc\" is the current value,
\"in\" the new item entering the window, \"out\" the item exiting the window.
The value passed to the dowstream reducing function is (f acc) enabling acc to be
mutable and 1-arity f to project its state to a value.
If you don't want to see the accumulator until the window is full then you need to
use (drop (dec n)) to remove them."
[n f invf]
(fn [rf]
(let [ring (object-array n)
vi (volatile! (- n))
vwacc (volatile! (f))]
(fn
([] (rf))
([acc] (rf acc))
([acc x]
(let [i @vi
wacc @vwacc] ; window accumulator
(if (neg? i) ; not full yet
(do
(aset ring (+ n i) x)
(vreset! vi (inc i))
(rf acc (f (vreset! vwacc (f wacc x)))))
(let [x' (aget ring i)]
(aset ring i x)
(vreset! vi (let [i (inc i)] (if (= n i) 0 i)))
(rf acc (f (vreset! vwacc (f (invf wacc x') x))))))))))))
(defn count ([] 0) ([n] n) ([n _] (inc n)))
(defn juxt