diff --git a/README.md b/README.md index 0132fe5..4bd2025 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,10 @@ More transducers and reducing functions for Clojure! +Transducers: `reduce`, `into`, `by-key`, `partition`, `pad` and `for`. + +Reducing functions: `str`, `str!`, `avg`, `count`, `juxt`, `juxt-map`. + ## Usage Add this dependency to your project: @@ -33,7 +37,7 @@ Add this dependency to your project: ;; let's go transient! (defn my-group-by [kfn coll] - (into {} (x/by-key kfn (x/reduce (completing conj! persistent!))) coll)) + (into {} (x/by-key kfn (x/into [])) coll)) => (quick-bench (group-by odd? (range 256))) Execution time mean : 29,356531 µs @@ -41,10 +45,28 @@ Add this dependency to your project: Execution time mean : 20,604297 µs ``` -`avg` is a reducing fn to compute the arithmetic mean. `juxt` is used to compute several reducing fns at once. +Like `by-key`, `partition` also takes a transducer as an argument to allow further computation on the partition without buffering. + +```clj +=> (sequence (x/partition 4 (x/reduce +)) (range 16)) +(6 22 38 54) +``` + +Padding can be achieved using the `pad` function: + +```clj +=> (sequence (x/partition 4 (comp (x/pad 4 (repeat :pad)) (x/into []))) (range 9)) +([0 1 2 3] [4 5 6 7] [8 :pad :pad :pad]) +``` + + +`avg` is a reducing fn to compute the arithmetic mean. `juxt` and `juxt-map` are used to compute several reducing fns at once. + ```clj => (into {} (x/by-key odd? (x/reduce (x/juxt + x/avg))) (range 256)) {false [16256 127], true [16384 128]} +=> (into {} (x/by-key odd? (x/reduce (x/juxt-map :sum + :mean x/avg :count x/count))) (range 256)) +{false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}} ``` ## License diff --git a/src/net/cgrand/xforms.clj b/src/net/cgrand/xforms.clj index f4dcc03..f844abe 100644 --- a/src/net/cgrand/xforms.clj +++ b/src/net/cgrand/xforms.clj @@ -1,7 +1,7 @@ (ns net.cgrand.xforms "Extra transducers for Clojure" {:author "Christophe Grand"} - (:refer-clojure :exclude [reduce for partition str juxt]) + (:refer-clojure :exclude [reduce into count for partition str juxt]) (:require [clojure.core :as clj])) (defmacro for @@ -42,6 +42,19 @@ ([f init] (reduce (fn ([] init) ([acc] (f acc)) ([acc x] (f acc x)))))) +(defn into + "Returns a transducer which accumulate every input in a collection and outputs only the accumulated collection." + [coll] + (reduce (if (instance? clojure.lang.IEditableCollection coll) + (fn + ([] (transient coll)) + ([acc] (persistent! acc)) + ([acc x] (conj! acc x))) + (fn + ([] coll) + ([acc] acc) + ([acc x] (conj acc x)))))) + (defmacro ^:private or-instance? [class x y] (let [xsym (gensym 'x_)] `(let [~xsym ~x] @@ -89,6 +102,58 @@ (vswap! m assoc! k noprf)) (unreduced acc)))))))) +(defn- spawn + "Every n items, spawns a new pipeline." + [n xform] + (fn [rf] + (let [ncrf (fn ([]) ([acc] acc) ([acc x] (rf acc x))) ; no init no complete rf + vrfs (volatile! []) + m (volatile! 0)] + (fn + ([] (rf)) + ([acc] + (rf (clj/reduce #(%2 %1) acc @vrfs))) + ([acc x] + (let [rfs @vrfs + step! (fn [acc rf] + (let [acc (rf acc x)] + (if (reduced? acc) + (rf (unreduced acc)) + (do + (vswap! vrfs conj! rf) + acc))))] + (vreset! vrfs (transient [])) + (let [acc (clj/reduce step! acc rfs) + acc (if (neg? (vswap! m dec)) + (do + (step! acc (xform ncrf)) + (vswap! m + n)) + acc)] + (vswap! vrfs persistent!) + acc))))))) + +(defn pad [n padding-coll] + (fn [rf] + (let [n (volatile! n)] + (fn + ([] (rf)) + ([acc] + (rf (clj/reduce ((take @n) rf) acc padding-coll))) + ([acc x] + (vswap! n dec) + (rf acc x)))))) + +(defn partition + "Returns a partitioning transducer. Each partition is independently transformed using the xform transducer. + Unlike clojure.core/partition the last partitions may be incomplete. + Partitions can be padded using #'pad." + ; being strict towards partition size implies buffering and avoiding unecessary buffering is part of this + ; library goal. So partition won't support it. However a buffer transducer may be an option. + ([n xform] + (partition n n xform)) + ([n step xform] + (spawn step (comp (take n) xform)))) + (defn avg "Reducing fn to compute the arithmetic mean." ([] @@ -103,6 +168,8 @@ ([acc] (acc)) ([acc x] (acc x))) +(defn count ([] 0) ([n] n) ([n _] (inc n))) + (defn juxt "Returns a reducing fn which compute all rfns at once and whose final return value is a vector of the final return values of each rfns." @@ -112,8 +179,17 @@ ([] (mapv #(vector % (volatile! (%))) rfns)) ([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc)) ([acc x] - (let [some-unreduced (reduce (fn [some-unreduced [rf vacc]] - (when-not (reduced? @vacc) (vswap! vacc rf x) true)) + (let [some-unreduced (clj/reduce (fn [some-unreduced [rf vacc]] + (when-not (reduced? @vacc) (vswap! vacc rf x) true)) false acc)] (if some-unreduced acc (reduced acc))))))) +(defn juxt-map + [& key-rfns] + (let [f (apply juxt (take-nth 2 (next key-rfns))) + keys (vec (take-nth 2 key-rfns))] + (fn + ([] (f)) + ([acc] (zipmap keys (f acc))) + ([acc x] (f acc x))))) +