Add tests (and fixes) for the intricacies of transducers impl.
This commit is contained in:
parent
e51f409cb8
commit
b600288bfd
2 changed files with 85 additions and 7 deletions
|
|
@ -76,6 +76,11 @@
|
|||
|
||||
(defn- noprf "The noop reducing function" ([acc] acc) ([acc _] acc))
|
||||
|
||||
(defn- multiplexable
|
||||
"Creates a multiplexable reducing function (doesn't init or complete the uderlying rf)."
|
||||
[rf]
|
||||
(fn ([]) ([acc] acc) ([acc x] (rf acc x)))) ; no init no complete rf
|
||||
|
||||
(defn by-key
|
||||
"Returns a transducer which partitions items according to kfn.
|
||||
It applies the transform specified by xform to each partition.
|
||||
|
|
@ -88,17 +93,17 @@
|
|||
([kfn vfn pair xform]
|
||||
(fn [rf]
|
||||
(let [make-rf (if pair
|
||||
(fn [k] (fn ([acc] (rf acc)) ([acc v] (rf acc (pair k v)))))
|
||||
(constantly rf))
|
||||
(fn [k] (fn ([acc] acc) ([acc v] (rf acc (pair k v)))))
|
||||
(constantly (multiplexable rf)))
|
||||
m (volatile! (transient {}))]
|
||||
(fn self
|
||||
([] (rf))
|
||||
([acc] (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m))))
|
||||
([acc] (rf (clj/reduce (fn [acc krf] (krf acc)) acc (vals (persistent! @m)))))
|
||||
([acc x]
|
||||
(let [k (kfn x)
|
||||
krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
|
||||
acc (krf acc (vfn x))]
|
||||
(when (reduced? acc)
|
||||
(when (reduced? acc) ; complete?
|
||||
(vswap! m assoc! k noprf))
|
||||
(unreduced acc))))))))
|
||||
|
||||
|
|
@ -106,7 +111,7 @@
|
|||
"Every n items, spawns a new pipeline."
|
||||
[n xform]
|
||||
(fn [rf]
|
||||
(let [ncrf (fn ([]) ([acc] acc) ([acc x] (rf acc x))) ; no init no complete rf
|
||||
(let [mxrf (multiplexable rf)
|
||||
vrfs (volatile! [])
|
||||
m (volatile! 0)]
|
||||
(fn
|
||||
|
|
@ -126,8 +131,8 @@
|
|||
(let [acc (clj/reduce step! acc rfs)
|
||||
acc (if (neg? (vswap! m dec))
|
||||
(do
|
||||
(step! acc (xform ncrf))
|
||||
(vswap! m + n))
|
||||
(vswap! m + n)
|
||||
(step! acc (xform mxrf)))
|
||||
acc)]
|
||||
(vswap! vrfs persistent!)
|
||||
acc)))))))
|
||||
|
|
|
|||
73
test/net/cgrand/xforms_test.clj
Normal file
73
test/net/cgrand/xforms_test.clj
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
(ns net.cgrand.xforms-test
|
||||
(:require [clojure.test :refer :all]
|
||||
[net.cgrand.xforms :as x]))
|
||||
|
||||
(defn trial
|
||||
"A transducing context for testing that transducers are well-behaved towards
|
||||
linear use of the accumulator, init, completion and handling of reduced values.
|
||||
A \"poisonous\" reducing function rf is passed to the transducer.
|
||||
n is the number of calls to rf before it returns a reduced.
|
||||
accs is a collection of successive return values for rf."
|
||||
([xform n coll]
|
||||
(trial xform n (repeatedly #(Object.)) coll))
|
||||
([xform n accs coll]
|
||||
(let [vaccs (volatile! accs)
|
||||
vstate (volatile! {:n n :acc (first @vaccs) :state :init})
|
||||
check-acc (fn [acc]
|
||||
(when (reduced? acc)
|
||||
(throw (ex-info "Called with reduced accumulator" (assoc @vstate :actual-acc acc))))
|
||||
(when-not (identical? acc (:acc @vstate))
|
||||
(throw (ex-info "Called with an unexpected accumulator (either non-linear or out of thin air)" (assoc @vstate :actual-acc acc)))))
|
||||
rf (fn
|
||||
([]
|
||||
(when-not (= :init (:state @vstate))
|
||||
(throw (ex-info "Init arity called on a started or completed rf." @vstate)))
|
||||
(:acc (vswap! vstate assoc :state :started)))
|
||||
([acc]
|
||||
(when (= :completed (:state @vstate))
|
||||
(throw (ex-info "Completion arity called on an already completed rf." @vstate)))
|
||||
(check-acc acc)
|
||||
(:acc (vswap! vstate assoc :state :completed :acc (first (vswap! vaccs next)))))
|
||||
([acc x]
|
||||
(when (= :completed (:state @vstate))
|
||||
(throw (ex-info "Step arity called on an already completed rf." @vstate)))
|
||||
(when (= :reduced (:state @vstate))
|
||||
(throw (ex-info "Step arity called on a reduced rf." @vstate)))
|
||||
(check-acc acc)
|
||||
(let [n (:n @vstate)]
|
||||
(let [acc (first (vswap! vaccs next))]
|
||||
(if (pos? n)
|
||||
(:acc (vswap! vstate assoc :acc acc :n (dec n)))
|
||||
(reduced (:acc (vswap! vstate assoc :acc acc :status :reduced))))))))
|
||||
res (transduce xform rf coll)]
|
||||
(check-acc res)
|
||||
(when-not (= :completed (:state @vstate))
|
||||
(throw (ex-info "Completion arity never called" @vstate)))
|
||||
true)))
|
||||
|
||||
(deftest proper-rf-usage
|
||||
(testing "Ensuring that reducing functions returned by transducers are well-behaved."
|
||||
(is (trial (x/by-key odd? identity)
|
||||
4 (range 16)))
|
||||
(is (trial (x/by-key odd? identity nil identity)
|
||||
4 (range 16)))
|
||||
(is (trial (x/by-key odd? (take 2))
|
||||
8 (range 16)))
|
||||
(is (trial (x/by-key odd? identity)
|
||||
8 (range 2)))
|
||||
(is (trial (x/partition 3 identity)
|
||||
4 (range 16)))
|
||||
(is (trial (x/partition 3 (take 2))
|
||||
8 (range 16)))
|
||||
(is (trial (x/partition 3 (take 2))
|
||||
8 (range 2)))
|
||||
(is (trial (x/into [])
|
||||
4 (range 16)))
|
||||
(is (trial (x/for [x % y (range x)] [x y])
|
||||
4 (range 16)))
|
||||
(is (trial (x/reduce +)
|
||||
4 (range 16)))
|
||||
(is (trial (x/pad 2 (repeat :pad))
|
||||
4 (range 16)))
|
||||
(is (trial (x/pad 8 (repeat :pad))
|
||||
4 (range 16)))))
|
||||
Loading…
Reference in a new issue