xio/sh produces reducible collections

This commit is contained in:
Christophe Grand 2018-04-16 23:07:18 +02:00
parent 00e19651ed
commit f2165ba932
3 changed files with 88 additions and 78 deletions

View file

@ -15,7 +15,7 @@ In `net.cgrand.xforms`:
* aggregators: `reduce`, `into`, `without`, `transjuxt`, `last`, `count`, `avg`, `sd`, `min`, `minimum`, `max`, `maximum`, `str` * aggregators: `reduce`, `into`, `without`, `transjuxt`, `last`, `count`, `avg`, `sd`, `min`, `minimum`, `max`, `maximum`, `str`
In `net.cgrand.xforms.io`: In `net.cgrand.xforms.io`:
* `sh` to use any process as a transducer * `sh` to use any process as a reducible collection (of stdout lines) or as a transducers (input as stdin lines, stdout lines as output).
*Reducing functions* *Reducing functions*

View file

@ -1,4 +1,4 @@
(defproject net.cgrand/xforms "0.16.0" (defproject net.cgrand/xforms "0.17.0"
:description "Extra transducers for Clojure" :description "Extra transducers for Clojure"
:url "https://github.com/cgrand/xforms" :url "https://github.com/cgrand/xforms"
:license {:name "Eclipse Public License" :license {:name "Eclipse Public License"

View file

@ -101,7 +101,8 @@
(cond (map? x) x (string? x) {:enc x} (keyword? x) {:mode x}))) (cond (map? x) x (string? x) {:enc x} (keyword? x) {:mode x})))
(defn sh (defn sh
"Transducer. Spawns a process (program cmd with optional arguments arg1 ... argN) and pipes data through it. "Transducer or reducible view (in this case assumes empty stdin).
Spawns a process (program cmd with optional arguments arg1 ... argN) and pipes data through it.
Options may be: Options may be:
* :env, an environment variables map, it will be merged with clojure.java.shell/*sh-env* and JVM environment (in decreasing precedence order), * :env, an environment variables map, it will be merged with clojure.java.shell/*sh-env* and JVM environment (in decreasing precedence order),
* :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir), * :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir),
@ -113,79 +114,88 @@
In :text mode, values are strings." In :text mode, values are strings."
{:arglists '([cmd arg1 ... argN & opts])} {:arglists '([cmd arg1 ... argN & opts])}
[& args] [& args]
(fn [rf] (reify
(let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args) clojure.lang.IReduce
env (into (or sh/*sh-env* {}) env) (reduce [self rf]
env (into {} (for [[k v] env] [(name k) (str v)])) (reduce rf (eduction self nil))) ; quick way to handle no init
proc (-> ^java.util.List (map str cmd) ProcessBuilder. (reduce [self rf init]
(.redirectError java.lang.ProcessBuilder$Redirect/INHERIT) (let [xf (self rf)]
(doto (-> .environment (.putAll env))) (xf init)))
(.directory (io/as-file dir)) clojure.lang.Fn
.start) clojure.lang.IFn
EOS (Object.) (invoke [_ rf]
q (java.util.concurrent.ArrayBlockingQueue. 16) (let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args)
drain (fn [acc] env (into (or sh/*sh-env* {}) env)
(loop [acc acc] env (into {} (for [[k v] env] [(name k) (str v)]))
(if-some [x (.poll q)] proc (-> ^java.util.List (map str cmd) ProcessBuilder.
(let [acc (if (identical? EOS x) (reduced acc) (rf acc x))] (.redirectError java.lang.ProcessBuilder$Redirect/INHERIT)
(if (reduced? acc) (doto (-> .environment (.putAll env)))
(do (.directory (io/as-file dir))
(.destroy proc) .start)
acc) EOS (Object.)
(recur acc))) q (java.util.concurrent.ArrayBlockingQueue. 16)
acc))) drain (fn [acc]
in (stream-spec in) (loop [acc acc]
out (stream-spec out) (if-some [x (.poll q)]
stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (java.io.OutputStreamWriter. (:enc in)) java.io.BufferedWriter.)) (let [acc (if (identical? EOS x) (reduced acc) (rf acc x))]
stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (java.io.InputStreamReader. (:enc out)) java.io.BufferedReader.)) (if (reduced? acc)
write! (do
(case (:mode in) (.destroy proc)
:lines acc)
(fn [x] (recur acc)))
(doto ^java.io.BufferedWriter stdin acc)))
(.write (str x)) in (stream-spec in)
.newLine)) out (stream-spec out)
:text stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (java.io.OutputStreamWriter. (:enc in)) java.io.BufferedWriter.))
(fn [x] stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (java.io.InputStreamReader. (:enc out)) java.io.BufferedReader.))
(.write ^java.io.BufferedWriter stdin (str x))) write!
:bytes (case (:mode in)
(fn [^bytes x] :lines
(.write ^java.io.OutputStream stdin x)))] (fn [x]
(-> (case (:mode out) (doto ^java.io.BufferedWriter stdin
:lines (.write (str x))
#(loop [] .newLine))
(if-some [s (.readLine ^java.io.BufferedReader stdout)] :text
(do (.put q s) (recur)) (fn [x]
(.put q EOS))) (.write ^java.io.BufferedWriter stdin (str x)))
:text :bytes
#(let [buf (char-array 1024)] (fn [^bytes x]
(loop [] (.write ^java.io.OutputStream stdin x)))]
(let [n (.read ^java.io.BufferedReader stdout buf)] (-> (case (:mode out)
(if (neg? n) :lines
(.put q EOS) #(loop []
(do (.put q (String. buf 0 n)) (recur)))))) (if-some [s (.readLine ^java.io.BufferedReader stdout)]
:bytes (do (.put q s) (recur))
#(let [buf (byte-array 1024)] (.put q EOS)))
(loop [] :text
(let [n (.read ^java.io.InputStream stdout buf)] #(let [buf (char-array 1024)]
(if (neg? n) (loop []
(.put q EOS) (let [n (.read ^java.io.BufferedReader stdout buf)]
(do (.put q (java.util.Arrays/copyOf buf n)) (recur))))))) (if (neg? n)
Thread. .start) (.put q EOS)
(fn (do (.put q (String. buf 0 n)) (recur))))))
([] (rf)) :bytes
([acc] #(let [buf (byte-array 1024)]
(.close stdin) (loop []
(loop [acc acc] (let [n (.read ^java.io.InputStream stdout buf)]
(if (neg? n)
(.put q EOS)
(do (.put q (java.util.Arrays/copyOf buf n)) (recur)))))))
Thread. .start)
(fn
([] (rf))
([acc]
(.close stdin)
(loop [acc acc]
(let [acc (drain acc)]
(if (reduced? acc)
(rf (unreduced acc))
(recur acc)))))
([acc x]
(let [acc (drain acc)] (let [acc (drain acc)]
(if (reduced? acc) (try
(rf (unreduced acc)) (when-not (reduced? acc)
(recur acc))))) (write! x))
([acc x] acc
(let [acc (drain acc)] (catch java.io.IOException e
(try (ensure-reduced acc))))))))))
(when-not (reduced? acc)
(write! x))
acc
(catch java.io.IOException e
(ensure-reduced acc)))))))))