diff --git a/README.md b/README.md index a4e1b6b..8e60749 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ More transducers and reducing functions for Clojure! [![Build Status](https://travis-ci.org/cgrand/xforms.png?branch=master)](https://travis-ci.org/cgrand/xforms) -Transducers: `reduce`, `into`, `by-key`, `partition`, `pad`, `for` and `window`. +Transducers: `reduce`, `into`, `by-key`, `partition`, `pad`, `for`, `window` and `window-by-time`. Reducing functions: `str`, `str!`, `avg`, `count`, `juxt`, `juxt-map` and `first`. @@ -15,7 +15,7 @@ Transducing context: `transjuxt` (for performing several transductions in a sing Add this dependency to your project: ```clj -[net.cgrand/xforms "0.1.1-SNAPSHOT"] +[net.cgrand/xforms "0.2.0"] ``` ```clj diff --git a/project.clj b/project.clj index 199de0e..74f4dbd 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject net.cgrand/xforms "0.1.2-SNAPSHOT" +(defproject net.cgrand/xforms "0.2.0" :description "Extra transducers for Clojure" #_#_:url "http://example.com/FIXME" :license {:name "Eclipse Public License" diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj index fa87130..aca3aef 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.clj @@ -280,6 +280,59 @@ (vreset! vi (let [i (inc i)] (if (= n i) 0 i))) (rf acc (f (vreset! vwacc (f (invf wacc x') x)))))))))))) +(defn window-by-time + "Returns a transducer which computes a windowed accumulator over chronologically sorted items. + + timef is a function from one item to its scaled timestamp (as a double). The window length is always 1.0 + so timef must normalize timestamps. For example if timestamps are in seconds (and under the :ts key), + to get a 1-hour window you have to use (fn [x] (/ (:ts x) 3600.0)) as timef. + + n is the integral number of steps by which the window slides. With a 1-hour window, 4 means that the window slides every 15 minutes. + + f and invf work like in #'window." + [timef n f invf] + (let [timef (fn [x] (long (Math/floor (* n (timef x)))))] + (fn [rf] + (let [dq (java.util.ArrayDeque.) + vwacc (volatile! (f)) + flush! + (fn [acc ^long from-ts ^long to-ts] + (loop [ts from-ts acc acc wacc @vwacc] + (let [x (.peekFirst dq)] + (cond + (= ts (timef x)) + (do + (.pollFirst dq) + (recur ts acc (invf wacc x))) + (= ts to-ts) + (do + (vreset! vwacc wacc) + acc) + :else + (let [acc (rf acc (f wacc))] + (if (reduced? acc) + (do + (vreset! vwacc wacc) + acc) + (recur (inc ts) acc wacc)))))))] + (fn + ([] (rf)) + ([acc] + (let [acc (if-not (.isEmpty dq) + (unreduced (rf acc (f @vwacc))) + acc)] + (rf acc))) + ([acc x] + (let [limit (- (timef x) n) + prev-limit (if-some [prev-x (.peekLast dq)] + (- (timef prev-x) n) + limit) + _ (.addLast dq x) ; so dq is never empty for flush! + acc (flush! acc prev-limit limit)] + (when-not (reduced? acc) + (vswap! vwacc f x)) + acc))))))) + (defn count ([] 0) ([n] n) ([n _] (inc n))) (defn juxt diff --git a/test/net/cgrand/xforms_test.clj b/test/net/cgrand/xforms_test.clj index 9e0b364..7089d1d 100644 --- a/test/net/cgrand/xforms_test.clj +++ b/test/net/cgrand/xforms_test.clj @@ -71,3 +71,19 @@ 4 (range 16))) (is (trial (x/pad 8 (repeat :pad)) 4 (range 16))))) + +(deftest window-by-time + (is (= (into + [] + (x/window-by-time :ts 4 + (fn + ([] clojure.lang.PersistentQueue/EMPTY) + ([q] (vec q)) + ([q x] (conj q x))) + (fn [q _] (pop q))) + (map (fn [x] {:ts x}) (concat (range 0 2 0.5) (range 3 5 0.25)))) + [[{:ts 0}] [{:ts 0}] [{:ts 0} {:ts 0.5}] [{:ts 0} {:ts 0.5}] [{:ts 0.5} {:ts 1.0}] [{:ts 0.5} {:ts 1.0}] + [{:ts 1.0} {:ts 1.5}] [{:ts 1.0} {:ts 1.5}] [{:ts 1.5}] [{:ts 1.5}] [] [] [{:ts 3}] [{:ts 3} {:ts 3.25}] + [{:ts 3} {:ts 3.25} {:ts 3.5}] [{:ts 3} {:ts 3.25} {:ts 3.5} {:ts 3.75}] [{:ts 3.25} {:ts 3.5} {:ts 3.75} {:ts 4.0}] + [{:ts 3.5} {:ts 3.75} {:ts 4.0} {:ts 4.25}] [{:ts 3.75} {:ts 4.0} {:ts 4.25} {:ts 4.5}] + [{:ts 4.0} {:ts 4.25} {:ts 4.5} {:ts 4.75}]]))) \ No newline at end of file