Remove x/pad and x/first; add x/multiplex; fix several issues with reduced handling

This commit is contained in:
Christophe Grand 2016-10-10 17:11:45 +02:00
parent 2eb63f6578
commit 596ee03918
4 changed files with 176 additions and 121 deletions

View file

@ -4,9 +4,9 @@ More transducers and reducing functions for Clojure!
[![Build Status](https://travis-ci.org/cgrand/xforms.png?branch=master)](https://travis-ci.org/cgrand/xforms)
Transducers: `reduce`, `into`, `count`, `by-key`, `partition`, `pad`, `for`, `window` and `window-by-time`.
Transducers: `reduce`, `into`, `count`, `by-key`, `partition`, `for`, `multiplex`, `window` and `window-by-time`.
Reducing functions: `str`, `str!`, `avg`, `juxt`, `juxt-map` and `first`.
Reducing functions: `str`, `str!`, `avg`, `juxt`, `juxt-map` and `last`.
Transducing context: `transjuxt` (for performing several transductions in a single pass).
@ -15,7 +15,7 @@ Transducing context: `transjuxt` (for performing several transductions in a sing
Add this dependency to your project:
```clj
[net.cgrand/xforms "0.3.1"]
[net.cgrand/xforms "0.4.0"]
```
```clj
@ -57,17 +57,17 @@ Add this dependency to your project:
Execution time mean : 20,604297 µs
```
Like `by-key`, `partition` also takes a transducer as an argument to allow further computation on the partition without buffering.
Like `by-key`, `partition` also takes a transducer as last argument to allow further computation on the partition.
```clj
=> (sequence (x/partition 4 (x/reduce +)) (range 16))
(6 22 38 54)
```
Padding can be achieved using the `pad` function:
Padding is achieved as usual:
```clj
=> (sequence (x/partition 4 (comp (x/pad 4 (repeat :pad)) (x/into []))) (range 9))
=> (sequence (x/partition 4 4 (repeat :pad) (x/into [])) (range 9))
([0 1 2 3] [4 5 6 7] [8 :pad :pad :pad])
```

View file

@ -1,4 +1,4 @@
(defproject net.cgrand/xforms "0.3.1"
(defproject net.cgrand/xforms "0.4.0"
:description "Extra transducers for Clojure"
#_#_:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"

View file

@ -1,7 +1,7 @@
(ns net.cgrand.xforms
"Extra transducers for Clojure"
{:author "Christophe Grand"}
(:refer-clojure :exclude [reduce into count for partition str juxt first])
(:refer-clojure :exclude [reduce into count for partition str juxt last])
(:require [clojure.core :as clj]))
(defmacro for
@ -14,6 +14,7 @@
pair? #(and (vector? %) (= 2 (clj/count %)))
destructuring-pair? (every-pred pair?
#(not-any? (some-fn keyword? #{'&}) %))
rpairs (clj/partition 2 (rseq (vec seq-exprs)))
build (fn [init]
(clj/reduce (fn [body [expr binding]]
(case binding
@ -26,10 +27,16 @@
(clj/reduce-kv (fn [~acc ~@binding] ~body) ~acc expr#)
(clj/reduce (fn [~acc ~binding] ~body) ~acc expr#)))
`(clj/reduce (fn [~acc ~binding] ~body) ~acc ~expr))))
init (clj/partition 2 (rseq (vec seq-exprs)))))
body (if (and (pair? body-expr) (nil? (meta body-expr)))
(build `(~rf ~acc ~@body-expr))
(build `(~rf ~acc ~body-expr)))]
init rpairs))
nested-reduceds (clj/for [[expr binding] rpairs
:when (not (keyword? binding))]
`reduced)
body (build `(let [acc# (~rf ~acc ~@(if (and (pair? body-expr) (nil? (meta body-expr)))
body-expr
[body-expr]))]
(if (reduced? acc#)
(-> acc# ~@nested-reduceds)
acc#)))]
`(fn [~rf]
(let [~rf (ensure-kvrf ~rf)]
(kvrf
@ -51,7 +58,7 @@
(defmacro kvrf [name? & fn-bodies]
(let [name (if (symbol? name?) name? (gensym '_))
fn-bodies (if (symbol? name?) fn-bodies (cons name? fn-bodies))
fn-bodies (if (vector? (clj/first fn-bodies)) (list fn-bodies) fn-bodies)]
fn-bodies (if (vector? (first fn-bodies)) (list fn-bodies) fn-bodies)]
`(reify
clojure.lang.Fn
KvRfable
@ -145,13 +152,25 @@
(defn- key' [kv] (nth kv 0))
(defn- val' [kv] (nth kv 1))
(defn- noprf "The noop reducing function" ([acc] acc) ([acc _] acc) ([acc _ _] acc))
(defn- nop-rf "The noop reducing function" ([acc] acc) ([acc _] acc) ([acc _ _] acc))
(defn- multiplexable
"Creates a multiplexable reducing function (doesn't init or complete the uderlying rf)."
"Returns a multiplexable reducing function (doesn't init or complete the uderlying rf, wraps reduced -- like preserving-reduced)"
[rf]
(let [rf (ensure-kvrf rf)]
(kvrf ([]) ([acc] acc) ([acc x] (rf acc x)) ([acc k v] (rf acc k v))))) ; no init no complete rf
(kvrf
([])
([acc] acc) ; no init no complete rf
([acc x]
(let [acc (rf acc x)]
(if (reduced? acc)
(reduced acc)
acc)))
([acc k v]
(let [acc (rf acc k v)]
(if (reduced? acc)
(reduced acc)
acc))))))
(defn by-key
"Returns a transducer which partitions items according to kfn.
@ -165,12 +184,12 @@
([kfn vfn pair xform]
(let [pair (if (identical? vector pair) ::default pair)]
(fn [rf]
(let [make-rf (cond
(nil? pair) (constantly (multiplexable rf))
(let [mrf (multiplexable rf)
make-rf (cond
(nil? pair) (constantly mrf)
(= ::default pair)
(let [rf (ensure-kvrf rf)]
(fn [k] (fn ([acc] acc) ([acc v] (rf acc k v)))))
:else (fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v))))))
(fn [k] (fn ([acc] acc) ([acc v] (mrf acc k v))))
:else (fn [k] (fn ([acc] acc) ([acc v] (mrf acc (pair k v))))))
m (volatile! (transient {}))]
(if (and (nil? kfn) (nil? vfn))
(kvrf self
@ -181,9 +200,15 @@
([acc k v]
(let [krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
acc (krf acc v)]
(when (reduced? acc) ; complete?
(vswap! m assoc! k noprf))
(unreduced acc))))
(if (reduced? acc)
(if (reduced? @acc)
(do
(vreset! m (transient {})) ; no need to run completions
@acc) ; downstream is done, propagate
(do
(vswap! m assoc! k nop-rf)
(krf @acc))) ; TODO think again
acc))))
(let [kfn (or kfn key')
vfn (or vfn val')]
(fn
@ -193,63 +218,69 @@
(let [k (kfn x)
krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
acc (krf acc (vfn x))]
(when (reduced? acc) ; complete?
(vswap! m assoc! k noprf))
(unreduced acc)))))))))))
(defn- spawn
"Every n items, spawns a new pipeline."
[n xform]
(fn [rf]
(let [mxrf (multiplexable rf)
vrfs (volatile! [])
m (volatile! 0)]
(fn
([] (rf))
([acc]
(rf (clj/reduce #(%2 %1) acc @vrfs)))
([acc x]
(let [rfs @vrfs
step! (fn [acc rf]
(let [acc (rf acc x)]
(if (reduced? acc)
(rf (unreduced acc))
(do
(vswap! vrfs conj! rf)
acc))))]
(vreset! vrfs (transient []))
(let [acc (clj/reduce step! acc rfs)
acc (if (neg? (vswap! m dec))
(if (reduced? acc)
(if (reduced? @acc)
(do
(vswap! m + n)
(step! acc (xform mxrf)))
acc)]
(vswap! vrfs persistent!)
acc)))))))
(defn pad [n padding-coll]
(fn [rf]
(let [n (volatile! n)]
(fn
([] (rf))
([acc]
(transduce (take @n) rf acc padding-coll))
([acc x]
(vswap! n dec)
(rf acc x))))))
(vreset! m (transient {})) ; no need to run completions
@acc) ; downstream is done, propagate
(do
(vswap! m assoc! k nop-rf)
(krf @acc)))
acc)))))))))))
(defn partition
"Returns a partitioning transducer. Each partition is independently transformed using the xform transducer.
Unlike clojure.core/partition the last partitions may be incomplete.
Partitions can be padded using #'pad."
; being strict towards partition size implies buffering and avoiding unecessary buffering is part of this
; library goal. So partition won't support it. However a buffer transducer may be an option.
"Returns a partitioning transducer. Each partition is independently transformed using the xform transducer."
([n]
(partition n n (into [])))
([n xform]
(partition n n xform))
([n step xform]
(spawn step (comp (take n) xform))))
([n step-or-xform]
(if (fn? step-or-xform)
(partition n n step-or-xform)
(partition n step-or-xform (into []))))
([n step pad-or-xform]
(if (fn? pad-or-xform)
(let [xform pad-or-xform]
(fn [rf]
(let [mxrf (multiplexable rf)
dq (java.util.ArrayDeque. n)
barrier (volatile! n)
xform (comp (map #(if (identical? dq %) nil %)) xform)]
(fn
([] (rf))
([acc] (rf acc))
([acc x]
(let [b (vswap! barrier dec)]
(when (< b n) (.add dq (if (nil? x) dq x)))
(if (zero? b)
; this transduce may return a reduced because of mxrf wrapping reduceds coming from rf
(let [acc (transduce xform mxrf acc dq)]
(dotimes [_ (min n step)] (.poll dq))
(vswap! barrier + step)
acc)
acc)))))))
(partition n step pad-or-xform (into []))))
([n step pad xform]
(fn [rf]
(let [mxrf (multiplexable rf)
dq (java.util.ArrayDeque. n)
barrier (volatile! n)
xform (comp (map #(if (identical? dq %) nil %)) xform)]
(fn
([] (rf))
([acc] (if (< @barrier n)
(let [xform (comp cat (take n) xform)]
; don't use mxrf for completion: we want completion and don't want reduced-wrapping
(transduce xform rf acc [dq pad]))
acc))
([acc x]
(let [b (vswap! barrier dec)]
(when (< b n) (.add dq (if (nil? x) dq x)))
(if (zero? b)
; this transduce may return a reduced because of mxrf wrapping reduceds coming from rf
(let [acc (transduce xform mxrf acc dq)]
(dotimes [_ (min n step)] (.poll dq))
(vswap! barrier + step)
acc)
acc))))))))
(defn avg
"Reducing fn to compute the arithmetic mean."
@ -382,37 +413,59 @@
(if some-unreduced acc (reduced acc)))))))
(defn multiplex
[xforms-map]
"Returns a transducer that runs several transducers (sepcified by xforms) in parallel.
If xforms is a map, values of the map are transducers and keys are used to tag each
transducer output:
=> (into [] (x/multiplex [(map inc) (map dec)]) (range 3))
[1 -1 2 0 3 1] ; no map, no tag
=> (into [] (x/multiplex {:up (map inc) :down (map dec)}) (range 3))
[[:up 1] [:down -1] [:up 2] [:down 0] [:up 3] [:down 1]]"
[xforms]
(fn [rf]
(let [mrf (multiplexable (ensure-kvrf rf))
rfs-map (volatile! (into {} (for [[k xform] %
:let [xform (comp xform (for [x %] [k x]))]]
[k (xform mrf)])
xforms-map))]
rfs (volatile! (if (map? xforms)
(into {} (for [[k xform] %
:let [xform (comp xform (for [x %] [k x]))]]
[k (xform mrf)])
xforms)
(into #{} (map #(% mrf)) xforms)))
invoke-rfs (if (map? xforms)
(fn [acc invoke]
(reduce-kv
(fn [acc tag rf]
(let [acc (invoke rf acc)]
(if (reduced? acc)
(if (reduced? @acc)
(do
(vreset! rfs nil)
acc) ; downstream is done, propagate
(do (vswap! rfs dissoc tag) (rf @acc)))
acc)))
acc @rfs))
(fn [acc invoke]
(clj/reduce
(fn [acc rf]
(let [acc (invoke rf acc)]
(if (reduced? acc)
(if (reduced? @acc)
(do
(vreset! rfs nil)
acc) ; downstream is done, propagate
(do (vswap! rfs disj rf) (rf @acc)))
acc)))
acc @rfs)))]
(kvrf
([] (rf))
([acc] (rf acc))
([acc] (rf (invoke-rfs acc #(%1 %2))))
([acc x]
(let [acc (reduce-kv
(fn [acc tag rf]
(let [acc (rf acc x)]
(if (reduced? acc)
(do (vswap! rfs-map dissoc tag) (rf @acc))
acc)))
acc @rfs-map)]
(if (zero? (clj/count @rfs-map))
(reduced acc)
(let [acc (invoke-rfs acc #(%1 %2 x))]
(if (zero? (clj/count @rfs))
(ensure-reduced acc)
acc)))
([acc k v]
(let [acc (reduce-kv
(fn [acc tag rf]
(let [acc (rf acc k v)]
(if (reduced? acc)
(do (vswap! rfs-map dissoc tag) (rf @acc))
acc)))
acc @rfs-map)]
(if (zero? (clj/count @rfs-map))
(reduced acc)
(let [acc (invoke-rfs acc #(%1 %2 k v))]
(if (zero? (clj/count @rfs))
(ensure-reduced acc)
acc)))))))
(defn juxt-map
@ -426,11 +479,11 @@
([acc x] (f acc x))
([acc k v] (f acc k v))))))
(defn first
"Reducing function that returns the first value or nil if none."
(defn last
"Reducing function that returns the last value."
([] nil)
([x] x)
([_ x] (reduced x)))
([_ x] x))
(defn transjuxt
"Performs several transductions over coll at once. xforms-map can be a map or a sequential collection.
@ -438,18 +491,24 @@
When xforms-map is a sequential collection returns a vector of same length as xforms-map.
Returns a transducer when coll is omitted."
([xforms-map]
(let [[f args] (if (map? xforms-map)
[juxt-map (comp (by-key (map #(% first))) cat)]
[juxt (map #(% first))])]
(reduce (apply f (sequence args xforms-map)))))
(let [collect-xform (if (map? xforms-map)
(into {})
(reduce (kvrf
([] (clj/reduce (fn [v _] (conj! v nil))
(transient []) (range (count xforms-map))))
([v] (persistent! v))
([v i x] (assoc! v i x)))))
xforms-map (if (map? xforms-map) xforms-map (zipmap (range xforms-map)))]
(comp
(multiplex (into {} (by-key (map #(comp % (take 1)))) xforms-map))
collect-xform)))
([xforms-map coll]
(transduce (transjuxt xforms-map) first coll)))
(transduce (transjuxt xforms-map) last coll)))
#_(defn intermix
[xforms]
(fn [rf]
(let [mxrf (multiplexable rf)
rfs (volatile! (into #{} (map #(%2 mxrf)) xforms))]
(fn
)))
)
;; map stuff
(defn update
([m k xform]
(update m k xform nil))
([m k xform not-found]
(let [rf (xform (fn ([m] m) ([m v] (assoc m k v))))]
(rf (unreduced (rf (dissoc m k) (get m k not-found)))))))

View file

@ -38,7 +38,7 @@
(let [acc (first (vswap! vaccs next))]
(if (pos? n)
(:acc (vswap! vstate assoc :acc acc :n (dec n)))
(reduced (:acc (vswap! vstate assoc :acc acc :status :reduced))))))))
(reduced (:acc (vswap! vstate assoc :acc acc :state :reduced))))))))
res (transduce xform rf coll)]
(check-acc res)
(when-not (= :completed (:state @vstate))
@ -66,10 +66,6 @@
(is (trial (x/for [x % y (range x)] [x y])
4 (range 16)))
(is (trial (x/reduce +)
4 (range 16)))
(is (trial (x/pad 2 (repeat :pad))
4 (range 16)))
(is (trial (x/pad 8 (repeat :pad))
4 (range 16)))))
(deftest window-by-time