diff --git a/README.md b/README.md index 228f917..978a7e6 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ In `net.cgrand.xforms.io`: * in `net.cgrand.xforms`: `transjuxt` (for performing several transductions in a single pass), `iterator` (clojure only), `into`, `without`, `count` and `some`. * in `net.cgrand.xforms.io`: `line-out` (3+ args) and `edn-out` (3+ args). + * in `net.cgrand.xforms.nodejs.stream`: `transformer`. *Reducible views* (in `net.cgrand.xforms.io`): `lines-in` and `edn-in`. @@ -39,7 +40,7 @@ In `net.cgrand.xforms.io`: Add this dependency to your project: ```clj -[net.cgrand/xforms "0.14.0"] +[net.cgrand/xforms "0.15.0"] ``` ```clj diff --git a/project.clj b/project.clj index b969a04..f074ab6 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject net.cgrand/xforms "0.13.0" +(defproject net.cgrand/xforms "0.15.0" :description "Extra transducers for Clojure" :url "https://github.com/cgrand/xforms" :license {:name "Eclipse Public License" diff --git a/src/net/cgrand/xforms/nodejs/stream.cljs b/src/net/cgrand/xforms/nodejs/stream.cljs new file mode 100644 index 0000000..fe7e264 --- /dev/null +++ b/src/net/cgrand/xforms/nodejs/stream.cljs @@ -0,0 +1,31 @@ +(ns net.cgrand.xforms.nodejs.stream) + +(def ^:private Transform (.-Transform (js/require "stream"))) + +(defn transformer + "Returns a stream.Transform object that performs the specified transduction. + options is a js object as per stream.Transform -- however :readableObjectMode and :writableObjectMode are set to true by default." + ([xform] (transformer #js {} xform)) + ([options xform] + (let [xrf (xform (fn + ([transform] (doto transform .end)) + ([transform x] + (when-not (.push transform x) + (throw (js/Error. "Transformer's internal buffer is full, try passing a larger :highWaterMark option."))) + transform)))] + (specify! (Transform. (.assign js/Object #js {:readableObjectMode true + :writableObjectMode true} options)) + Object + (_transform [this x _ cb] + (try + (when (reduced? (xrf this x)) + (.push this nil)) + (cb) + (catch :default err + (cb err)))) + (_flush [this cb] + (try + (xrf this) + (cb) + (catch :default err + (cb err))))))))