xforms 0.11.0, with xio/sh to use any shell process as a transducer
This commit is contained in:
parent
89d384ce74
commit
899154c0df
3 changed files with 107 additions and 4 deletions
12
README.md
12
README.md
|
|
@ -4,19 +4,27 @@ More transducers and reducing functions for Clojure(script)!
|
||||||
|
|
||||||
[](https://travis-ci.org/cgrand/xforms)
|
[](https://travis-ci.org/cgrand/xforms)
|
||||||
|
|
||||||
*Transducers* (in `net.cgrand.xforms`) can be classified in three groups: regular ones, higher-order ones
|
*Transducers* can be classified in three groups: regular ones, higher-order ones
|
||||||
(which accept other transducers as arguments) and 1-item ones which emit only 1 item out no matter how many went in.
|
(which accept other transducers as arguments) and 1-item ones which emit only 1 item out no matter how many went in.
|
||||||
1-item transducers generally only make sense in the context of a higher-order transducer.
|
1-item transducers generally only make sense in the context of a higher-order transducer.
|
||||||
|
|
||||||
|
In `net.cgrand.xforms`:
|
||||||
|
|
||||||
* regular ones: `partition` (1 arg), `reductions`, `for`, `take-last`, `drop-last`, `window` and `window-by-time`
|
* regular ones: `partition` (1 arg), `reductions`, `for`, `take-last`, `drop-last`, `window` and `window-by-time`
|
||||||
* higher-order ones: `by-key`, `into-by-key`, `multiplex`, `transjuxt`, `partition` (2+ args)
|
* higher-order ones: `by-key`, `into-by-key`, `multiplex`, `transjuxt`, `partition` (2+ args)
|
||||||
* 1-item ones: `reduce`, `into`, `transjuxt`, `last`, `count`, `avg`, `sd`, `min`, `minimum`, `max`, `maximum`, `str`
|
* 1-item ones: `reduce`, `into`, `transjuxt`, `last`, `count`, `avg`, `sd`, `min`, `minimum`, `max`, `maximum`, `str`
|
||||||
|
|
||||||
|
In `net.cgrand.xforms.io`:
|
||||||
|
* `sh` to use any process as a transducer
|
||||||
|
|
||||||
|
|
||||||
*Reducing functions*
|
*Reducing functions*
|
||||||
|
|
||||||
* in `net.cgrand.xforms.rfs`: `min`, `minimum`, `max`, `maximum`, `str`, `str!`, `avg`, `sd`, `last` and `some`.
|
* in `net.cgrand.xforms.rfs`: `min`, `minimum`, `max`, `maximum`, `str`, `str!`, `avg`, `sd`, `last` and `some`.
|
||||||
* in `net.cgrand.xforms.io`: `line-out` and `edn-out`.
|
* in `net.cgrand.xforms.io`: `line-out` and `edn-out`.
|
||||||
|
|
||||||
|
(in `net.cgrand.xforms`)
|
||||||
|
|
||||||
*Transducing contexts*:
|
*Transducing contexts*:
|
||||||
|
|
||||||
* in `net.cgrand.xforms`: `transjuxt` (for performing several transductions in a single pass), `into`, `count` and `some`.
|
* in `net.cgrand.xforms`: `transjuxt` (for performing several transductions in a single pass), `into`, `count` and `some`.
|
||||||
|
|
@ -29,7 +37,7 @@ More transducers and reducing functions for Clojure(script)!
|
||||||
Add this dependency to your project:
|
Add this dependency to your project:
|
||||||
|
|
||||||
```clj
|
```clj
|
||||||
[net.cgrand /xforms "0.10.2"]
|
[net.cgrand /xforms "0.11.0"]
|
||||||
```
|
```
|
||||||
|
|
||||||
```clj
|
```clj
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
(defproject net.cgrand/xforms "0.10.2"
|
(defproject net.cgrand/xforms "0.11.0"
|
||||||
:description "Extra transducers for Clojure"
|
:description "Extra transducers for Clojure"
|
||||||
:url "https://github.com/cgrand/xforms"
|
:url "https://github.com/cgrand/xforms"
|
||||||
:license {:name "Eclipse Public License"
|
:license {:name "Eclipse Public License"
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
(ns net.cgrand.xforms.io
|
(ns net.cgrand.xforms.io
|
||||||
(:require [clojure.java.io :as io]
|
(:require [clojure.java.io :as io]
|
||||||
|
[clojure.java.shell :as sh]
|
||||||
[clojure.edn :as edn]))
|
[clojure.edn :as edn]))
|
||||||
|
|
||||||
(defn keep-opts [m like]
|
(defn keep-opts [m like]
|
||||||
|
|
@ -93,4 +94,98 @@
|
||||||
w))
|
w))
|
||||||
([out xform coll & opts]
|
([out xform coll & opts]
|
||||||
(with-open [w (apply io/writer out opts)]
|
(with-open [w (apply io/writer out opts)]
|
||||||
(transduce xform edn-out w coll))))
|
(transduce xform edn-out w coll))))
|
||||||
|
|
||||||
|
(defn- stream-spec [x]
|
||||||
|
(into {:mode :lines :enc "UTF-8"}
|
||||||
|
(cond (map? x) x (string? x) {:enc x} (keyword? x) {:mode x})))
|
||||||
|
|
||||||
|
(defn sh
|
||||||
|
"Transducer. Spawns a process (program cmd with optional arguments arg1 ... argN) and pipes data through it.
|
||||||
|
Options may be:
|
||||||
|
* :env, an environment variables map, it will be merged with clojure.java.shell/*sh-env* and JVM environment (in decreasing precedence order),
|
||||||
|
* :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir),
|
||||||
|
* :in and :out which are maps with keys :mode (:lines (default), :text or :bytes) and :enc (defaults to \"UTF-8\");
|
||||||
|
encoding applies only for modes :lines or :text; shorthands exist: a single keyword is equivalent to {:mode k :enc \"UTF-8\"},
|
||||||
|
a single string is equivelent to {:mode :lines, :enc s}.
|
||||||
|
In :bytes mode, values are bytes array.
|
||||||
|
In :lines mode, values are strings representing lines without line delimiters.
|
||||||
|
In :text mode, values are strings."
|
||||||
|
{:arglists '([cmd arg1 ... argN & opts])}
|
||||||
|
[& args]
|
||||||
|
(fn [rf]
|
||||||
|
(let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args)
|
||||||
|
env (into (or sh/*sh-env* {}) env)
|
||||||
|
env (into {} (for [[k v] env] [(name k) (str v)]))
|
||||||
|
proc (-> ^java.util.List (map str cmd) ProcessBuilder.
|
||||||
|
(.redirectError java.lang.ProcessBuilder$Redirect/INHERIT)
|
||||||
|
(doto (-> .environment (.putAll env)))
|
||||||
|
(.directory (io/as-file dir))
|
||||||
|
.start)
|
||||||
|
EOS (Object.)
|
||||||
|
q (java.util.concurrent.ArrayBlockingQueue. 16)
|
||||||
|
drain (fn [acc]
|
||||||
|
(loop [acc acc]
|
||||||
|
(if-some [x (.poll q)]
|
||||||
|
(let [acc (if (identical? EOS x) (reduced acc) (rf acc x))]
|
||||||
|
(if (reduced? acc)
|
||||||
|
(do
|
||||||
|
(.destroy proc)
|
||||||
|
acc)
|
||||||
|
(recur acc)))
|
||||||
|
acc)))
|
||||||
|
in (stream-spec in)
|
||||||
|
out (stream-spec out)
|
||||||
|
stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (java.io.OutputStreamWriter. (:enc in)) java.io.BufferedWriter.))
|
||||||
|
stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (java.io.InputStreamReader. (:enc out)) java.io.BufferedReader.))
|
||||||
|
write!
|
||||||
|
(case (:mode in)
|
||||||
|
:lines
|
||||||
|
(fn [x]
|
||||||
|
(doto ^java.io.BufferedWriter stdin
|
||||||
|
(.write (str x))
|
||||||
|
.newLine))
|
||||||
|
:text
|
||||||
|
(fn [x]
|
||||||
|
(.write ^java.io.BufferedWriter stdin (str x)))
|
||||||
|
:bytes
|
||||||
|
(fn [^bytes x]
|
||||||
|
(.write ^java.io.OutputStream stdin x)))]
|
||||||
|
(-> (case (:mode out)
|
||||||
|
:lines
|
||||||
|
#(loop []
|
||||||
|
(if-some [s (.readLine ^java.io.BufferedReader stdout)]
|
||||||
|
(do (.put q s) (recur))
|
||||||
|
(.put q EOS)))
|
||||||
|
:text
|
||||||
|
#(let [buf (char-array 1024)]
|
||||||
|
(loop []
|
||||||
|
(let [n (.read ^java.io.BufferedReader stdout buf)]
|
||||||
|
(if (neg? n)
|
||||||
|
(.put q EOS)
|
||||||
|
(do (.put q (String. buf 0 n)) (recur))))))
|
||||||
|
:bytes
|
||||||
|
#(let [buf (byte-array 1024)]
|
||||||
|
(loop []
|
||||||
|
(let [n (.read ^java.io.InputStream stdout buf)]
|
||||||
|
(if (neg? n)
|
||||||
|
(.put q EOS)
|
||||||
|
(do (.put q (java.util.Arrays/copyOf buf n)) (recur)))))))
|
||||||
|
Thread. .start)
|
||||||
|
(fn
|
||||||
|
([] (rf))
|
||||||
|
([acc]
|
||||||
|
(.close stdin)
|
||||||
|
(loop [acc acc]
|
||||||
|
(let [acc (drain acc)]
|
||||||
|
(if (reduced? acc)
|
||||||
|
(rf (unreduced acc))
|
||||||
|
(recur acc)))))
|
||||||
|
([acc x]
|
||||||
|
(let [acc (drain acc)]
|
||||||
|
(try
|
||||||
|
(when-not (reduced? acc)
|
||||||
|
(write! x))
|
||||||
|
acc
|
||||||
|
(catch java.io.IOException e
|
||||||
|
(ensure-reduced acc)))))))))
|
||||||
Loading…
Reference in a new issue