v0.15 stream.Transformer

This commit is contained in:
Christophe Grand 2017-11-16 18:53:46 +01:00
parent c52dc4c873
commit 82c7c1fff2
3 changed files with 34 additions and 2 deletions

View file

@ -29,6 +29,7 @@ In `net.cgrand.xforms.io`:
* in `net.cgrand.xforms`: `transjuxt` (for performing several transductions in a single pass), `iterator` (clojure only), `into`, `without`, `count` and `some`.
* in `net.cgrand.xforms.io`: `line-out` (3+ args) and `edn-out` (3+ args).
* in `net.cgrand.xforms.nodejs.stream`: `transformer`.
*Reducible views* (in `net.cgrand.xforms.io`): `lines-in` and `edn-in`.
@ -39,7 +40,7 @@ In `net.cgrand.xforms.io`:
Add this dependency to your project:
```clj
[net.cgrand/xforms "0.14.0"]
[net.cgrand/xforms "0.15.0"]
```
```clj

View file

@ -1,4 +1,4 @@
(defproject net.cgrand/xforms "0.13.0"
(defproject net.cgrand/xforms "0.15.0"
:description "Extra transducers for Clojure"
:url "https://github.com/cgrand/xforms"
:license {:name "Eclipse Public License"

View file

@ -0,0 +1,31 @@
(ns net.cgrand.xforms.nodejs.stream)
(def ^:private Transform (.-Transform (js/require "stream")))
(defn transformer
"Returns a stream.Transform object that performs the specified transduction.
options is a js object as per stream.Transform -- however :readableObjectMode and :writableObjectMode are set to true by default."
([xform] (transformer #js {} xform))
([options xform]
(let [xrf (xform (fn
([transform] (doto transform .end))
([transform x]
(when-not (.push transform x)
(throw (js/Error. "Transformer's internal buffer is full, try passing a larger :highWaterMark option.")))
transform)))]
(specify! (Transform. (.assign js/Object #js {:readableObjectMode true
:writableObjectMode true} options))
Object
(_transform [this x _ cb]
(try
(when (reduced? (xrf this x))
(.push this nil))
(cb)
(catch :default err
(cb err))))
(_flush [this cb]
(try
(xrf this)
(cb)
(catch :default err
(cb err))))))))