diff --git a/README.md b/README.md index 6f65192..df71022 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ In `net.cgrand.xforms`: * aggregators: `reduce`, `into`, `without`, `transjuxt`, `last`, `count`, `avg`, `sd`, `min`, `minimum`, `max`, `maximum`, `str` In `net.cgrand.xforms.io`: - * `sh` to use any process as a transducer + * `sh` to use any process as a reducible collection (of stdout lines) or as a transducers (input as stdin lines, stdout lines as output). *Reducing functions* diff --git a/project.clj b/project.clj index 52dc12a..2485d73 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject net.cgrand/xforms "0.16.0" +(defproject net.cgrand/xforms "0.17.0" :description "Extra transducers for Clojure" :url "https://github.com/cgrand/xforms" :license {:name "Eclipse Public License" diff --git a/src/net/cgrand/xforms/io.clj b/src/net/cgrand/xforms/io.clj index 48f3149..ef10042 100644 --- a/src/net/cgrand/xforms/io.clj +++ b/src/net/cgrand/xforms/io.clj @@ -101,7 +101,8 @@ (cond (map? x) x (string? x) {:enc x} (keyword? x) {:mode x}))) (defn sh - "Transducer. Spawns a process (program cmd with optional arguments arg1 ... argN) and pipes data through it. + "Transducer or reducible view (in this case assumes empty stdin). + Spawns a process (program cmd with optional arguments arg1 ... argN) and pipes data through it. Options may be: * :env, an environment variables map, it will be merged with clojure.java.shell/*sh-env* and JVM environment (in decreasing precedence order), * :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir), @@ -113,79 +114,88 @@ In :text mode, values are strings." {:arglists '([cmd arg1 ... argN & opts])} [& args] - (fn [rf] - (let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args) - env (into (or sh/*sh-env* {}) env) - env (into {} (for [[k v] env] [(name k) (str v)])) - proc (-> ^java.util.List (map str cmd) ProcessBuilder. - (.redirectError java.lang.ProcessBuilder$Redirect/INHERIT) - (doto (-> .environment (.putAll env))) - (.directory (io/as-file dir)) - .start) - EOS (Object.) - q (java.util.concurrent.ArrayBlockingQueue. 16) - drain (fn [acc] - (loop [acc acc] - (if-some [x (.poll q)] - (let [acc (if (identical? EOS x) (reduced acc) (rf acc x))] - (if (reduced? acc) - (do - (.destroy proc) - acc) - (recur acc))) - acc))) - in (stream-spec in) - out (stream-spec out) - stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (java.io.OutputStreamWriter. (:enc in)) java.io.BufferedWriter.)) - stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (java.io.InputStreamReader. (:enc out)) java.io.BufferedReader.)) - write! - (case (:mode in) - :lines - (fn [x] - (doto ^java.io.BufferedWriter stdin - (.write (str x)) - .newLine)) - :text - (fn [x] - (.write ^java.io.BufferedWriter stdin (str x))) - :bytes - (fn [^bytes x] - (.write ^java.io.OutputStream stdin x)))] - (-> (case (:mode out) - :lines - #(loop [] - (if-some [s (.readLine ^java.io.BufferedReader stdout)] - (do (.put q s) (recur)) - (.put q EOS))) - :text - #(let [buf (char-array 1024)] - (loop [] - (let [n (.read ^java.io.BufferedReader stdout buf)] - (if (neg? n) - (.put q EOS) - (do (.put q (String. buf 0 n)) (recur)))))) - :bytes - #(let [buf (byte-array 1024)] - (loop [] - (let [n (.read ^java.io.InputStream stdout buf)] - (if (neg? n) - (.put q EOS) - (do (.put q (java.util.Arrays/copyOf buf n)) (recur))))))) - Thread. .start) - (fn - ([] (rf)) - ([acc] - (.close stdin) - (loop [acc acc] + (reify + clojure.lang.IReduce + (reduce [self rf] + (reduce rf (eduction self nil))) ; quick way to handle no init + (reduce [self rf init] + (let [xf (self rf)] + (xf init))) + clojure.lang.Fn + clojure.lang.IFn + (invoke [_ rf] + (let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args) + env (into (or sh/*sh-env* {}) env) + env (into {} (for [[k v] env] [(name k) (str v)])) + proc (-> ^java.util.List (map str cmd) ProcessBuilder. + (.redirectError java.lang.ProcessBuilder$Redirect/INHERIT) + (doto (-> .environment (.putAll env))) + (.directory (io/as-file dir)) + .start) + EOS (Object.) + q (java.util.concurrent.ArrayBlockingQueue. 16) + drain (fn [acc] + (loop [acc acc] + (if-some [x (.poll q)] + (let [acc (if (identical? EOS x) (reduced acc) (rf acc x))] + (if (reduced? acc) + (do + (.destroy proc) + acc) + (recur acc))) + acc))) + in (stream-spec in) + out (stream-spec out) + stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (java.io.OutputStreamWriter. (:enc in)) java.io.BufferedWriter.)) + stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (java.io.InputStreamReader. (:enc out)) java.io.BufferedReader.)) + write! + (case (:mode in) + :lines + (fn [x] + (doto ^java.io.BufferedWriter stdin + (.write (str x)) + .newLine)) + :text + (fn [x] + (.write ^java.io.BufferedWriter stdin (str x))) + :bytes + (fn [^bytes x] + (.write ^java.io.OutputStream stdin x)))] + (-> (case (:mode out) + :lines + #(loop [] + (if-some [s (.readLine ^java.io.BufferedReader stdout)] + (do (.put q s) (recur)) + (.put q EOS))) + :text + #(let [buf (char-array 1024)] + (loop [] + (let [n (.read ^java.io.BufferedReader stdout buf)] + (if (neg? n) + (.put q EOS) + (do (.put q (String. buf 0 n)) (recur)))))) + :bytes + #(let [buf (byte-array 1024)] + (loop [] + (let [n (.read ^java.io.InputStream stdout buf)] + (if (neg? n) + (.put q EOS) + (do (.put q (java.util.Arrays/copyOf buf n)) (recur))))))) + Thread. .start) + (fn + ([] (rf)) + ([acc] + (.close stdin) + (loop [acc acc] + (let [acc (drain acc)] + (if (reduced? acc) + (rf (unreduced acc)) + (recur acc))))) + ([acc x] (let [acc (drain acc)] - (if (reduced? acc) - (rf (unreduced acc)) - (recur acc))))) - ([acc x] - (let [acc (drain acc)] - (try - (when-not (reduced? acc) - (write! x)) - acc - (catch java.io.IOException e - (ensure-reduced acc))))))))) \ No newline at end of file + (try + (when-not (reduced? acc) + (write! x)) + acc + (catch java.io.IOException e + (ensure-reduced acc)))))))))) \ No newline at end of file