From 596ee039180c235c0b812db8b85ad99aeabcf575 Mon Sep 17 00:00:00 2001 From: Christophe Grand Date: Mon, 10 Oct 2016 17:11:45 +0200 Subject: [PATCH] Remove x/pad and x/first; add x/multiplex; fix several issues with reduced handling --- README.md | 12 +- project.clj | 2 +- src/net/cgrand/xforms.clj | 277 +++++++++++++++++++------------- test/net/cgrand/xforms_test.clj | 6 +- 4 files changed, 176 insertions(+), 121 deletions(-) diff --git a/README.md b/README.md index 15a79cf..adf03bd 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,9 @@ More transducers and reducing functions for Clojure! [![Build Status](https://travis-ci.org/cgrand/xforms.png?branch=master)](https://travis-ci.org/cgrand/xforms) -Transducers: `reduce`, `into`, `count`, `by-key`, `partition`, `pad`, `for`, `window` and `window-by-time`. +Transducers: `reduce`, `into`, `count`, `by-key`, `partition`, `for`, `multiplex`, `window` and `window-by-time`. -Reducing functions: `str`, `str!`, `avg`, `juxt`, `juxt-map` and `first`. +Reducing functions: `str`, `str!`, `avg`, `juxt`, `juxt-map` and `last`. Transducing context: `transjuxt` (for performing several transductions in a single pass). @@ -15,7 +15,7 @@ Transducing context: `transjuxt` (for performing several transductions in a sing Add this dependency to your project: ```clj -[net.cgrand/xforms "0.3.1"] +[net.cgrand/xforms "0.4.0"] ``` ```clj @@ -57,17 +57,17 @@ Add this dependency to your project: Execution time mean : 20,604297 µs ``` -Like `by-key`, `partition` also takes a transducer as an argument to allow further computation on the partition without buffering. +Like `by-key`, `partition` also takes a transducer as last argument to allow further computation on the partition. ```clj => (sequence (x/partition 4 (x/reduce +)) (range 16)) (6 22 38 54) ``` -Padding can be achieved using the `pad` function: +Padding is achieved as usual: ```clj -=> (sequence (x/partition 4 (comp (x/pad 4 (repeat :pad)) (x/into []))) (range 9)) +=> (sequence (x/partition 4 4 (repeat :pad) (x/into [])) (range 9)) ([0 1 2 3] [4 5 6 7] [8 :pad :pad :pad]) ``` diff --git a/project.clj b/project.clj index 07046f5..18ed9ac 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject net.cgrand/xforms "0.3.1" +(defproject net.cgrand/xforms "0.4.0" :description "Extra transducers for Clojure" #_#_:url "http://example.com/FIXME" :license {:name "Eclipse Public License" diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj index 7483fa6..9f21bc7 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.clj @@ -1,7 +1,7 @@ (ns net.cgrand.xforms "Extra transducers for Clojure" {:author "Christophe Grand"} - (:refer-clojure :exclude [reduce into count for partition str juxt first]) + (:refer-clojure :exclude [reduce into count for partition str juxt last]) (:require [clojure.core :as clj])) (defmacro for @@ -14,6 +14,7 @@ pair? #(and (vector? %) (= 2 (clj/count %))) destructuring-pair? (every-pred pair? #(not-any? (some-fn keyword? #{'&}) %)) + rpairs (clj/partition 2 (rseq (vec seq-exprs))) build (fn [init] (clj/reduce (fn [body [expr binding]] (case binding @@ -26,10 +27,16 @@ (clj/reduce-kv (fn [~acc ~@binding] ~body) ~acc expr#) (clj/reduce (fn [~acc ~binding] ~body) ~acc expr#))) `(clj/reduce (fn [~acc ~binding] ~body) ~acc ~expr)))) - init (clj/partition 2 (rseq (vec seq-exprs))))) - body (if (and (pair? body-expr) (nil? (meta body-expr))) - (build `(~rf ~acc ~@body-expr)) - (build `(~rf ~acc ~body-expr)))] + init rpairs)) + nested-reduceds (clj/for [[expr binding] rpairs + :when (not (keyword? binding))] + `reduced) + body (build `(let [acc# (~rf ~acc ~@(if (and (pair? body-expr) (nil? (meta body-expr))) + body-expr + [body-expr]))] + (if (reduced? acc#) + (-> acc# ~@nested-reduceds) + acc#)))] `(fn [~rf] (let [~rf (ensure-kvrf ~rf)] (kvrf @@ -51,7 +58,7 @@ (defmacro kvrf [name? & fn-bodies] (let [name (if (symbol? name?) name? (gensym '_)) fn-bodies (if (symbol? name?) fn-bodies (cons name? fn-bodies)) - fn-bodies (if (vector? (clj/first fn-bodies)) (list fn-bodies) fn-bodies)] + fn-bodies (if (vector? (first fn-bodies)) (list fn-bodies) fn-bodies)] `(reify clojure.lang.Fn KvRfable @@ -145,13 +152,25 @@ (defn- key' [kv] (nth kv 0)) (defn- val' [kv] (nth kv 1)) -(defn- noprf "The noop reducing function" ([acc] acc) ([acc _] acc) ([acc _ _] acc)) +(defn- nop-rf "The noop reducing function" ([acc] acc) ([acc _] acc) ([acc _ _] acc)) (defn- multiplexable - "Creates a multiplexable reducing function (doesn't init or complete the uderlying rf)." + "Returns a multiplexable reducing function (doesn't init or complete the uderlying rf, wraps reduced -- like preserving-reduced)" [rf] (let [rf (ensure-kvrf rf)] - (kvrf ([]) ([acc] acc) ([acc x] (rf acc x)) ([acc k v] (rf acc k v))))) ; no init no complete rf + (kvrf + ([]) + ([acc] acc) ; no init no complete rf + ([acc x] + (let [acc (rf acc x)] + (if (reduced? acc) + (reduced acc) + acc))) + ([acc k v] + (let [acc (rf acc k v)] + (if (reduced? acc) + (reduced acc) + acc)))))) (defn by-key "Returns a transducer which partitions items according to kfn. @@ -165,12 +184,12 @@ ([kfn vfn pair xform] (let [pair (if (identical? vector pair) ::default pair)] (fn [rf] - (let [make-rf (cond - (nil? pair) (constantly (multiplexable rf)) + (let [mrf (multiplexable rf) + make-rf (cond + (nil? pair) (constantly mrf) (= ::default pair) - (let [rf (ensure-kvrf rf)] - (fn [k] (fn ([acc] acc) ([acc v] (rf acc k v))))) - :else (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v)))))) + (fn [k] (fn ([acc] acc) ([acc v] (mrf acc k v)))) + :else (fn [k] (fn ([acc] acc) ([acc v] (mrf acc (pair k v)))))) m (volatile! (transient {}))] (if (and (nil? kfn) (nil? vfn)) (kvrf self @@ -181,9 +200,15 @@ ([acc k v] (let [krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) acc (krf acc v)] - (when (reduced? acc) ; complete? - (vswap! m assoc! k noprf)) - (unreduced acc)))) + (if (reduced? acc) + (if (reduced? @acc) + (do + (vreset! m (transient {})) ; no need to run completions + @acc) ; downstream is done, propagate + (do + (vswap! m assoc! k nop-rf) + (krf @acc))) ; TODO think again + acc)))) (let [kfn (or kfn key') vfn (or vfn val')] (fn @@ -193,63 +218,69 @@ (let [k (kfn x) krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k)))) acc (krf acc (vfn x))] - (when (reduced? acc) ; complete? - (vswap! m assoc! k noprf)) - (unreduced acc))))))))))) - -(defn- spawn - "Every n items, spawns a new pipeline." - [n xform] - (fn [rf] - (let [mxrf (multiplexable rf) - vrfs (volatile! []) - m (volatile! 0)] - (fn - ([] (rf)) - ([acc] - (rf (clj/reduce #(%2 %1) acc @vrfs))) - ([acc x] - (let [rfs @vrfs - step! (fn [acc rf] - (let [acc (rf acc x)] - (if (reduced? acc) - (rf (unreduced acc)) - (do - (vswap! vrfs conj! rf) - acc))))] - (vreset! vrfs (transient [])) - (let [acc (clj/reduce step! acc rfs) - acc (if (neg? (vswap! m dec)) + (if (reduced? acc) + (if (reduced? @acc) (do - (vswap! m + n) - (step! acc (xform mxrf))) - acc)] - (vswap! vrfs persistent!) - acc))))))) - -(defn pad [n padding-coll] - (fn [rf] - (let [n (volatile! n)] - (fn - ([] (rf)) - ([acc] - (transduce (take @n) rf acc padding-coll)) - ([acc x] - (vswap! n dec) - (rf acc x)))))) + (vreset! m (transient {})) ; no need to run completions + @acc) ; downstream is done, propagate + (do + (vswap! m assoc! k nop-rf) + (krf @acc))) + acc))))))))))) (defn partition - "Returns a partitioning transducer. Each partition is independently transformed using the xform transducer. - Unlike clojure.core/partition the last partitions may be incomplete. - Partitions can be padded using #'pad." - ; being strict towards partition size implies buffering and avoiding unecessary buffering is part of this - ; library goal. So partition won't support it. However a buffer transducer may be an option. + "Returns a partitioning transducer. Each partition is independently transformed using the xform transducer." ([n] (partition n n (into []))) - ([n xform] - (partition n n xform)) - ([n step xform] - (spawn step (comp (take n) xform)))) + ([n step-or-xform] + (if (fn? step-or-xform) + (partition n n step-or-xform) + (partition n step-or-xform (into [])))) + ([n step pad-or-xform] + (if (fn? pad-or-xform) + (let [xform pad-or-xform] + (fn [rf] + (let [mxrf (multiplexable rf) + dq (java.util.ArrayDeque. n) + barrier (volatile! n) + xform (comp (map #(if (identical? dq %) nil %)) xform)] + (fn + ([] (rf)) + ([acc] (rf acc)) + ([acc x] + (let [b (vswap! barrier dec)] + (when (< b n) (.add dq (if (nil? x) dq x))) + (if (zero? b) + ; this transduce may return a reduced because of mxrf wrapping reduceds coming from rf + (let [acc (transduce xform mxrf acc dq)] + (dotimes [_ (min n step)] (.poll dq)) + (vswap! barrier + step) + acc) + acc))))))) + (partition n step pad-or-xform (into [])))) + ([n step pad xform] + (fn [rf] + (let [mxrf (multiplexable rf) + dq (java.util.ArrayDeque. n) + barrier (volatile! n) + xform (comp (map #(if (identical? dq %) nil %)) xform)] + (fn + ([] (rf)) + ([acc] (if (< @barrier n) + (let [xform (comp cat (take n) xform)] + ; don't use mxrf for completion: we want completion and don't want reduced-wrapping + (transduce xform rf acc [dq pad])) + acc)) + ([acc x] + (let [b (vswap! barrier dec)] + (when (< b n) (.add dq (if (nil? x) dq x))) + (if (zero? b) + ; this transduce may return a reduced because of mxrf wrapping reduceds coming from rf + (let [acc (transduce xform mxrf acc dq)] + (dotimes [_ (min n step)] (.poll dq)) + (vswap! barrier + step) + acc) + acc)))))))) (defn avg "Reducing fn to compute the arithmetic mean." @@ -382,37 +413,59 @@ (if some-unreduced acc (reduced acc))))))) (defn multiplex - [xforms-map] + "Returns a transducer that runs several transducers (sepcified by xforms) in parallel. + If xforms is a map, values of the map are transducers and keys are used to tag each + transducer output: + => (into [] (x/multiplex [(map inc) (map dec)]) (range 3)) + [1 -1 2 0 3 1] ; no map, no tag + => (into [] (x/multiplex {:up (map inc) :down (map dec)}) (range 3)) + [[:up 1] [:down -1] [:up 2] [:down 0] [:up 3] [:down 1]]" + [xforms] (fn [rf] (let [mrf (multiplexable (ensure-kvrf rf)) - rfs-map (volatile! (into {} (for [[k xform] % - :let [xform (comp xform (for [x %] [k x]))]] - [k (xform mrf)]) - xforms-map))] + rfs (volatile! (if (map? xforms) + (into {} (for [[k xform] % + :let [xform (comp xform (for [x %] [k x]))]] + [k (xform mrf)]) + xforms) + (into #{} (map #(% mrf)) xforms))) + invoke-rfs (if (map? xforms) + (fn [acc invoke] + (reduce-kv + (fn [acc tag rf] + (let [acc (invoke rf acc)] + (if (reduced? acc) + (if (reduced? @acc) + (do + (vreset! rfs nil) + acc) ; downstream is done, propagate + (do (vswap! rfs dissoc tag) (rf @acc))) + acc))) + acc @rfs)) + (fn [acc invoke] + (clj/reduce + (fn [acc rf] + (let [acc (invoke rf acc)] + (if (reduced? acc) + (if (reduced? @acc) + (do + (vreset! rfs nil) + acc) ; downstream is done, propagate + (do (vswap! rfs disj rf) (rf @acc))) + acc))) + acc @rfs)))] (kvrf ([] (rf)) - ([acc] (rf acc)) + ([acc] (rf (invoke-rfs acc #(%1 %2)))) ([acc x] - (let [acc (reduce-kv - (fn [acc tag rf] - (let [acc (rf acc x)] - (if (reduced? acc) - (do (vswap! rfs-map dissoc tag) (rf @acc)) - acc))) - acc @rfs-map)] - (if (zero? (clj/count @rfs-map)) - (reduced acc) + (let [acc (invoke-rfs acc #(%1 %2 x))] + (if (zero? (clj/count @rfs)) + (ensure-reduced acc) acc))) ([acc k v] - (let [acc (reduce-kv - (fn [acc tag rf] - (let [acc (rf acc k v)] - (if (reduced? acc) - (do (vswap! rfs-map dissoc tag) (rf @acc)) - acc))) - acc @rfs-map)] - (if (zero? (clj/count @rfs-map)) - (reduced acc) + (let [acc (invoke-rfs acc #(%1 %2 k v))] + (if (zero? (clj/count @rfs)) + (ensure-reduced acc) acc))))))) (defn juxt-map @@ -426,11 +479,11 @@ ([acc x] (f acc x)) ([acc k v] (f acc k v)))))) -(defn first - "Reducing function that returns the first value or nil if none." +(defn last + "Reducing function that returns the last value." ([] nil) ([x] x) - ([_ x] (reduced x))) + ([_ x] x)) (defn transjuxt "Performs several transductions over coll at once. xforms-map can be a map or a sequential collection. @@ -438,18 +491,24 @@ When xforms-map is a sequential collection returns a vector of same length as xforms-map. Returns a transducer when coll is omitted." ([xforms-map] - (let [[f args] (if (map? xforms-map) - [juxt-map (comp (by-key (map #(% first))) cat)] - [juxt (map #(% first))])] - (reduce (apply f (sequence args xforms-map))))) + (let [collect-xform (if (map? xforms-map) + (into {}) + (reduce (kvrf + ([] (clj/reduce (fn [v _] (conj! v nil)) + (transient []) (range (count xforms-map)))) + ([v] (persistent! v)) + ([v i x] (assoc! v i x))))) + xforms-map (if (map? xforms-map) xforms-map (zipmap (range xforms-map)))] + (comp + (multiplex (into {} (by-key (map #(comp % (take 1)))) xforms-map)) + collect-xform))) ([xforms-map coll] - (transduce (transjuxt xforms-map) first coll))) + (transduce (transjuxt xforms-map) last coll))) -#_(defn intermix - [xforms] - (fn [rf] - (let [mxrf (multiplexable rf) - rfs (volatile! (into #{} (map #(%2 mxrf)) xforms))] - (fn - ))) - ) +;; map stuff +(defn update + ([m k xform] + (update m k xform nil)) + ([m k xform not-found] + (let [rf (xform (fn ([m] m) ([m v] (assoc m k v))))] + (rf (unreduced (rf (dissoc m k) (get m k not-found))))))) \ No newline at end of file diff --git a/test/net/cgrand/xforms_test.clj b/test/net/cgrand/xforms_test.clj index c055c09..32d898e 100644 --- a/test/net/cgrand/xforms_test.clj +++ b/test/net/cgrand/xforms_test.clj @@ -38,7 +38,7 @@ (let [acc (first (vswap! vaccs next))] (if (pos? n) (:acc (vswap! vstate assoc :acc acc :n (dec n))) - (reduced (:acc (vswap! vstate assoc :acc acc :status :reduced)))))))) + (reduced (:acc (vswap! vstate assoc :acc acc :state :reduced)))))))) res (transduce xform rf coll)] (check-acc res) (when-not (= :completed (:state @vstate)) @@ -66,10 +66,6 @@ (is (trial (x/for [x % y (range x)] [x y]) 4 (range 16))) (is (trial (x/reduce +) - 4 (range 16))) - (is (trial (x/pad 2 (repeat :pad)) - 4 (range 16))) - (is (trial (x/pad 8 (repeat :pad)) 4 (range 16))))) (deftest window-by-time