diff --git a/README.md b/README.md
index d39cebc..2de06c5 100644
--- a/README.md
+++ b/README.md
@@ -95,9 +95,9 @@ Padding can be achieved using the `pad` function:
;; avg of last 4 items
=> (sequence
- (x/window 4 x/avg #(x/avg % (- %2)))
+ (x/window 4 x/avg #(x/avg %1 %2 -1))
nums)
-(11 19/2 17 77/4 12 37/4 79/10 77/12)
+(11 19/2 17 77/4 18 37/2 79/4 77/4)
;; min of last 3 items
=> (sequence
@@ -163,6 +163,8 @@ Several xforms transducers and transducing contexts leverage `reduce-kv` and `kv
3-arg `into` (transducing context) | when `from` is a map | when `to` is a map
|
`by-key` (as a transducer) | when is `kfn` and `vfn` are unspecified or `nil` | when `pair` is `vector` or unspecified
|
`by-key` (as a transducing context on values) | no | no
+ |
| `juxt` | when at least one of the children `rfns` is a kvrf | no
+ |
| `juxt-map` | when at least one of the children `rfns` is a kvrf | no
diff --git a/project.clj b/project.clj
index 74f4dbd..75ff9d5 100644
--- a/project.clj
+++ b/project.clj
@@ -1,4 +1,4 @@
-(defproject net.cgrand/xforms "0.2.0"
+(defproject net.cgrand/xforms "0.3.0"
:description "Extra transducers for Clojure"
#_#_:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj
index 482b7b9..9c7f8ce 100644
--- a/src/net/cgrand/xforms.clj
+++ b/src/net/cgrand/xforms.clj
@@ -154,12 +154,14 @@
(defn- key' [kv] (nth kv 0))
(defn- val' [kv] (nth kv 1))
-(defn- noprf "The noop reducing function" ([acc] acc) ([acc _] acc))
+(defn- noprf "The noop reducing function" ([acc] acc) ([acc _] acc) ([acc _ _] acc))
(defn- multiplexable
"Creates a multiplexable reducing function (doesn't init or complete the uderlying rf)."
[rf]
- (fn ([]) ([acc] acc) ([acc x] (rf acc x)))) ; no init no complete rf
+ (if-some [rf (some-kvrf rf)]
+ (kvrf ([]) ([acc] acc) ([acc x] (rf acc x)) ([acc k v] (rf acc k v)))
+ (fn ([]) ([acc] acc) ([acc x] (rf acc x))))) ; no init no complete rf
(defn by-key
"Returns a transducer which partitions items according to kfn.
@@ -252,6 +254,8 @@
Partitions can be padded using #'pad."
; being strict towards partition size implies buffering and avoiding unecessary buffering is part of this
; library goal. So partition won't support it. However a buffer transducer may be an option.
+ ([n]
+ (partition n n (into [])))
([n xform]
(partition n n xform))
([n step xform]
@@ -259,17 +263,11 @@
(defn avg
"Reducing fn to compute the arithmetic mean."
- ([]
- (let [count (volatile! 0)
- sum (volatile! 0)]
- (fn secret-container
- ([] (when (pos? @count) (/ @sum @count)))
- ([n]
- (vswap! count inc)
- (vswap! sum + n)
- secret-container))))
- ([acc] (acc))
- ([acc x] (acc x)))
+ ([] (transient [0 0]))
+ ([[n sum]] (/ sum n))
+ ([acc x] (avg acc x 1))
+ ([[n sum :as acc] x w]
+ (-> acc (assoc! 0 (+ n w)) (assoc! 1 (+ sum (* w x))))))
(defn window
"Returns a transducer which computes an accumulator over the last n items
@@ -282,7 +280,10 @@
mutable and 1-arity f to project its state to a value.
If you don't want to see the accumulator until the window is full then you need to
- use (drop (dec n)) to remove them."
+ use (drop (dec n)) to remove them.
+
+ If you don't have an inverse function, consider using partition and reduce:
+ (x/partition 4 (x/reduce rf))"
[n f invf]
(fn [rf]
(let [ring (object-array n)
@@ -370,24 +371,79 @@
"Returns a reducing fn which compute all rfns at once and whose final return
value is a vector of the final return values of each rfns."
[& rfns]
- (let [rfns (vec rfns)]
- (fn
- ([] (mapv #(vector % (volatile! (%))) rfns))
- ([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
- ([acc x]
- (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
- (when-not (reduced? @vacc) (vswap! vacc rf x) true))
- false acc)]
- (if some-unreduced acc (reduced acc)))))))
+ (if (some some-kvrf rfns)
+ (let [rfns (mapv ensure-kvrf rfns)]
+ (kvrf
+ ([] (mapv #(vector % (volatile! (%))) rfns))
+ ([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
+ ([acc x]
+ (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
+ (when-not (reduced? @vacc) (vswap! vacc rf x) true))
+ false acc)]
+ (if some-unreduced acc (reduced acc))))
+ ([acc k v]
+ (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
+ (when-not (reduced? @vacc) (vswap! vacc rf k v) true))
+ false acc)]
+ (if some-unreduced acc (reduced acc))))))
+ (let [rfns (vec rfns)]
+ (fn
+ ([] (mapv #(vector % (volatile! (%))) rfns))
+ ([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
+ ([acc x]
+ (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]]
+ (when-not (reduced? @vacc) (vswap! vacc rf x) true))
+ false acc)]
+ (if some-unreduced acc (reduced acc))))))))
+
+(defn multiplex
+ [xforms-map]
+ (fn [rf]
+ (let [mrf (multiplexable (ensure-kvrf rf))
+ rfs-map (volatile! (into {} (for [[k xform] %
+ :let [xform (comp xform (for [x %] [k x]))]]
+ [k (xform mrf)])
+ xforms-map))]
+ (kvrf
+ ([] (rf))
+ ([acc] (rf acc))
+ ([acc x]
+ (let [acc (reduce-kv
+ (fn [acc tag rf]
+ (let [acc (rf acc x)]
+ (if (reduced? acc)
+ (do (vswap! rfs-map dissoc tag) (rf @acc))
+ acc)))
+ acc @rfs-map)]
+ (if (zero? (clj/count @rfs-map))
+ (reduced acc)
+ acc)))
+ ([acc k v]
+ (let [acc (reduce-kv
+ (fn [acc tag rf]
+ (let [acc (rf acc k v)]
+ (if (reduced? acc)
+ (do (vswap! rfs-map dissoc tag) (rf @acc))
+ acc)))
+ acc @rfs-map)]
+ (if (zero? (clj/count @rfs-map))
+ (reduced acc)
+ acc)))))))
(defn juxt-map
[& key-rfns]
(let [f (apply juxt (take-nth 2 (next key-rfns)))
keys (vec (take-nth 2 key-rfns))]
- (fn
- ([] (f))
- ([acc] (zipmap keys (f acc)))
- ([acc x] (f acc x)))))
+ (if-some [f (some-kvrf f)]
+ (kvrf
+ ([] (f))
+ ([acc] (zipmap keys (f acc)))
+ ([acc x] (f acc x))
+ ([acc k v] (f acc k v)))
+ (fn
+ ([] (f))
+ ([acc] (zipmap keys (f acc)))
+ ([acc x] (f acc x))))))
(defn first
"Reducing function that returns the first value or nil if none."
|