From a759ca7e7ad985544ec5471467c7406f19501945 Mon Sep 17 00:00:00 2001 From: Christophe Grand Date: Mon, 7 Sep 2015 16:18:17 +0200 Subject: [PATCH] Add the window transducer. --- README.md | 32 +++++++++++++++++++++++++++++++- src/net/cgrand/xforms.clj | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d2c2877..876abeb 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ More transducers and reducing functions for Clojure! [![Build Status](https://travis-ci.org/cgrand/xforms.png?branch=master)](https://travis-ci.org/cgrand/xforms) -Transducers: `reduce`, `into`, `by-key`, `partition`, `pad` and `for`. +Transducers: `reduce`, `into`, `by-key`, `partition`, `pad`, `for` and `window`. Reducing functions: `str`, `str!`, `avg`, `count`, `juxt`, `juxt-map`. @@ -79,6 +79,36 @@ Padding can be achieved using the `pad` function: {false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}} ``` +`window` is a new transducer to efficiently compute a windowed accumulator: + +```clj +;; sum of last 3 items +=> (sequence (x/window 3 + -) (range 16)) +(0 1 3 6 9 12 15 18 21 24 27 30 33 36 39 42) + +=> (def nums (repeatedly 8 #(rand-int 42))) +#'user/nums +=> nums +(11 8 32 26 6 10 37 24) + +;; avg of last 4 items +=> (sequence + (x/window 4 x/avg #(x/avg % (- %2))) + nums) +(11 19/2 17 77/4 12 37/4 79/10 77/12) + +;; min of last 3 items +=> (sequence + (x/window 3 + (fn + ([] (sorted-set)) + ([s] (first s)) + ([s x] (conj s x))) + disj) + nums) +(11 8 8 8 6 6 6 10) +``` + ## On Partitioning Both `by-key` and `partition` takes a transducer as parameter. This transducer is used to further process each partition. diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj index 1fd432d..30e5b32 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.clj @@ -173,6 +173,39 @@ ([acc] (acc)) ([acc x] (acc x))) +(defn window + "Returns a transducer which computes an accumulator over the last n items + using two functions: f and its inverse invf. + + The accumulator is initialized with (f). + It is updated to (f (invf acc out) in) where \"acc\" is the current value, + \"in\" the new item entering the window, \"out\" the item exiting the window. + The value passed to the dowstream reducing function is (f acc) enabling acc to be + mutable and 1-arity f to project its state to a value. + + If you don't want to see the accumulator until the window is full then you need to + use (drop (dec n)) to remove them." + [n f invf] + (fn [rf] + (let [ring (object-array n) + vi (volatile! (- n)) + vwacc (volatile! (f))] + (fn + ([] (rf)) + ([acc] (rf acc)) + ([acc x] + (let [i @vi + wacc @vwacc] ; window accumulator + (if (neg? i) ; not full yet + (do + (aset ring (+ n i) x) + (vreset! vi (inc i)) + (rf acc (f (vreset! vwacc (f wacc x))))) + (let [x' (aget ring i)] + (aset ring i x) + (vreset! vi (let [i (inc i)] (if (= n i) 0 i))) + (rf acc (f (vreset! vwacc (f (invf wacc x') x)))))))))))) + (defn count ([] 0) ([n] n) ([n _] (inc n))) (defn juxt